
在嵌入式系统或PC与外部硬件设备通信的场景中,串行通信(如UART、RS232/485)扮演着核心角色。当多个应用线程需要同时向同一串行设备发送查询或接收数据时,如何构建一个高层次的抽象层,以屏蔽底层的并发控制复杂性,成为了一个关键问题。本文将深入探讨这一挑战,并提供两种主流的解决方案。
大多数串行设备,特别是简单的从属设备,都遵循严格的请求-响应协议。这意味着设备在接收到一个查询(如“foo”或“bar”)后,会一直忙碌直到发送回相应的响应。在此期间,设备通常无法处理任何新的请求。因此,主机或主设备必须尊重并强制执行这一协议:一次请求消息的传输必须紧随其后是响应的接收(或适当的超时),在当前请求-响应周期完成之前,绝不能开始另一次请求消息的传输。
直接让多个线程并行地向串行端口写入和读取,如以下简化示例所示,将导致数据混乱或协议违规:
def get(query):
serial_port.write(query)
data = serial_port.read(8)
return data这种方式的问题不在于位或字节层面的数据混合(因为操作系统内核驱动程序会处理底层的I/O,线程无法直接访问硬件并同时发送位),而在于它没有强制执行请求-响应协议。如果一个线程在设备尚未响应前发送了另一个查询,或者在读取响应完成前另一个线程开始写入,都将导致通信失败或数据错乱。核心问题是缺乏同步机制来确保对串行端口的独占访问和协议的正确执行。
一种优雅且高度抽象的解决方案是引入一个专用的线程来管理串行端口的所有I/O操作。这个线程充当串行通信的“守门员”,负责处理所有发送到设备的请求和接收设备的响应。其他需要与串行设备交互的线程,不再直接操作串行端口,而是将它们的请求封装成消息,发送到一个共享的消息队列中。
工作原理:
优点:
这种方案将串行端口的竞争完全隐藏在队列机制之后,提供了一种简洁且健壮的抽象。
另一种直接但同样有效的方法是使用互斥锁(Mutex)来保护对串行端口的访问。这种方法不引入额外的通信线程,而是让所有需要访问串行端口的线程在执行I/O操作前,先获取一个共享的互斥锁。
工作原理:
以下是使用伪代码实现的示例:
// 假设 serial_fd 是串行端口的文件描述符,
// serial_mutex 是保护它的互斥锁
// acquire_the_mutex() 和 release_the_mutex() 是互斥锁操作函数
procedure serial_messaging(u8 *request_mesg, int rqlen, u8 *response_mesg, int rslen)
{
int rc;
// 1. 获取互斥锁,确保独占访问
acquire_the_mutex(); /* 否则调用线程将被阻塞 */
// 2. 发送请求消息
rc = write(serial_fd, request_mesg, rqlen);
if (rc < 0) {
// 处理错误条件
release_the_mutex(); // 错误时也需释放锁
return;
}
// tcdrain(serial_fd); // 对于测量超时可能需要,确保所有数据已从输出缓冲区发送
// 3. 读取响应消息(使用阻塞模式等待)
rc = read(serial_fd, response_mesg, rslen);
if (rc < 0) {
// 处理错误条件
release_the_mutex(); // 错误时也需释放锁
return;
}
// 4. 释放互斥锁,允许其他线程使用串行端口
release_the_mutex(); /* 让另一个线程使用串行端口 */
return; /* 返回接收到的数据 */
}有了 serial_messaging 函数,各个应用线程就可以安全地调用它来发送“foo”或“bar”查询,而无需担心底层的并发问题。例如,在Python中,可以利用 threading.Lock 实现:
import threading
import time
import random
# 模拟串口对象和互斥锁
class MockSerialPort:
def __init__(self):
self.last_query = b""
def write(self, data):
self.last_query = data # 模拟设备知道最后查询了什么
print(f"[{threading.current_thread().name}] Sending: {data.decode()}")
time.sleep(0.05) # 模拟发送时间
def read(self, length):
response = f"response_to_{self.last_query.decode()}".encode()
print(f"[{threading.current_thread().name}] Receiving: {response.decode()}")
time.sleep(0.05) # 模拟接收时间
return response[:length] # 模拟响应长度限制
serial_port = MockSerialPort()
serial_lock = threading.Lock()
def get_with_lock(query):
with serial_lock: # 使用 with 语句自动管理锁的获取和释放
serial_port.write(query)
data = serial_port.read(8) # 假设响应长度为8
return data
def thread1_task():
while True:
response = get_with_lock(b"foo")
print(f"[{threading.current_thread().name}] received: {response.decode()}")
time.sleep(1)
def thread2_task():
time.sleep(random.random()) # 随机等待一段时间
response = get_with_lock(b"bar")
print(f"[{threading.current_thread().name}] received: {response.decode()}")
# 示例:启动两个线程
# threading.Thread(target=thread1_task, name="Thread-Foo").start()
# threading.Thread(target=thread2_task, name="Thread-Bar").start()在实现串行通信的并发抽象时,有几个常见的误区和重要考量需要注意:
以上就是高级抽象:构建稳健的并发串口通信机制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号