
在嵌入式系统或工业控制等领域,串行通信(如uart、rs232/485)是设备间数据交换的常见方式。当应用程序涉及多个并发任务(线程)需要与同一个串行设备交互时,直接从不同线程操作串行端口会导致严重的问题。这并非因为位级别的混淆(操作系统内核驱动程序通常会处理底层i/o的原子性),而是因为大多数串行设备遵循严格的“请求-响应”协议,即设备在处理完一个请求并返回响应之前,无法处理新的请求。多线程同时发送请求会破坏这一协议,导致设备状态混乱或返回错误数据。因此,构建一个高级抽象层来管理串行端口的并发访问至关重要。
这种方案的核心思想是引入一个独立的、专用的通信线程,作为所有串行I/O操作的唯一执行者。其他需要与串行设备通信的线程,不再直接操作串口,而是将它们的请求封装成消息,并通过一个共享的请求队列发送给这个通信线程。通信线程则按顺序处理这些请求,执行串口的写入和读取操作,并将响应数据返回给相应的请求线程。
以下是一个使用Python threading 和 queue 模块实现基于队列的串行通信抽象的简化示例。
import queue
import threading
import time
import random
class SerialDeviceAbstraction:
"""
通过一个专用工作线程来管理串行通信,确保请求的序列化。
"""
def __init__(self, serial_port):
self.serial_port = serial_port # 实际的串口对象,例如 pyserial 的 Serial 实例
self.request_queue = queue.Queue() # 存放待处理的请求
self._next_request_id = 0
self._stop_event = threading.Event() # 用于控制工作线程的停止
self.worker_thread = threading.Thread(target=self._worker_loop)
self.worker_thread.daemon = True # 设置为守护线程,主程序退出时自动终止
def _worker_loop(self):
"""
专用工作线程的循环,负责处理队列中的串行请求。
"""
while not self._stop_event.is_set():
try:
# 从请求队列获取请求:(request_id, query_data, response_queue)
request_id, query_data, response_queue = self.request_queue.get(timeout=0.1)
print(f"工作线程: 处理请求 ID {request_id},查询 '{query_data}'")
# --- 模拟实际的串口通信操作 ---
# self.serial_port.write(query_data.encode()) # 写入请求
# response_bytes = self.serial_port.read(8) # 读取响应,假设响应固定8字节
# response_data = response_bytes.decode()
# 模拟串口I/O延迟和响应
time.sleep(0.1 + random.random() * 0.1)
response_data = f"响应 '{query_data}' (ID:{request_id})"
print(f"工作线程: 完成请求 ID {request_id},响应 '{response_data}'")
# 将响应发送回请求线程的私有队列
response_queue.put((request_id, response_data))
self.request_queue.task_done() # 标记此任务已完成
except queue.Empty:
# 队列为空,继续等待
continue
except Exception as e:
print(f"工作线程处理请求时发生错误: {e}")
# 错误处理,例如将错误信息回传给请求线程
def start(self):
"""启动串口通信工作线程。"""
self.worker_thread.start()
print("串口通信工作线程已启动。")
def stop(self):
"""停止串口通信工作线程。"""
self._stop_event.set()
self.worker_thread.join()
print("串口通信工作线程已停止。")
def get(self, query: str) -> str:
"""
供其他线程调用的接口,发送一个查询并等待响应。
"""
request_id = self._get_next_request_id()
# 为当前请求创建一个临时的、私有的响应队列
response_queue = queue.Queue(1)
# 将请求加入共享的请求队列
self.request_queue.put((request_id, query, response_queue))
print(f"线程提交请求 '{query}',等待响应...")
# 阻塞等待响应
_req_id, response = response_queue.get()
print(f"线程收到响应: {response}")
return response
def _get_next_request_id(self):
"""生成唯一的请求ID。"""
# 注意:在多线程环境中,对 _next_request_id 的操作也应受锁保护,
# 但对于简单的递增,Python的整数操作通常是原子性的。
# 更严谨的做法是使用 threading.Lock 或 atomic counter。
with threading.Lock():
self._next_request_id += 1
return self._next_request_id
# --- 示例用法 ---
# 假设这里有一个实际的串口对象,例如:
# import serial
# my_serial_port = serial.Serial('/dev/ttyUSB0', 9600, timeout=1)
# serial_abstraction = SerialDeviceAbstraction(my_serial_port)
# 为演示目的,我们传入 None 作为串口对象
serial_abstraction = SerialDeviceAbstraction(None)
serial_abstraction.start()
def thread_foo_query():
"""持续查询 'foo' 的线程。"""
while True:
serial_abstraction.get("foo")
time.sleep(1) # 每秒查询一次
def thread_bar_query():
"""随机查询 'bar' 的线程。"""
while True:
time.sleep(random.random() * 3) # 随机延迟
serial_abstraction.get("bar")
time.sleep(random.random() * 2)
# 启动业务逻辑线程
t_foo = threading.Thread(target=thread_foo_query)
t_bar = threading.Thread(target=thread_bar_query)
t以上就是多线程环境下串行通信的高级抽象与并发处理策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号