
在硬件设备通信场景中,尤其涉及到串口(serial port)通信时,经常会遇到多线程并发访问的需求。例如,一个线程可能需要持续地查询设备状态(如温度),而另一个线程可能在随机时间点发送一次性控制命令。如果不对串口访问进行有效管理,直接在多个线程中调用串口读写操作,将导致以下问题:
因此,构建一个高级抽象层来管理串口通信,并自动处理底层的并发问题,是实现稳定、可靠通信的关键。
一种优雅且推荐的解决方案是引入一个专门的线程来负责所有的串口I/O操作。这个线程充当串口通信的“管家”,所有其他线程的串口请求都通过一个队列提交给它。
import queue
import threading
import time
import random
# 假设的串口操作函数
def _serial_write(data):
print(f"[Serial] Sending: {data}")
time.sleep(0.1) # 模拟写入延迟
def _serial_read(size):
# 模拟读取响应
response_map = {
"foo": "temp_data",
"bar": "config_ack"
}
# 假设响应与请求直接相关,实际可能需要解析设备返回
# 这里简化为根据发送的数据模拟返回
if "foo" in threading.current_thread().name: # 模拟foo请求的响应
return response_map["foo"]
elif "bar" in threading.current_thread().name: # 模拟bar请求的响应
return response_map["bar"]
return "unknown_response"
class SerialCommunicator:
def __init__(self):
self.request_queue = queue.Queue()
self.response_queues = {} # 存储每个请求对应的响应队列
self.serial_thread = threading.Thread(target=self._run_serial_handler, name="SerialHandlerThread")
self.serial_thread.daemon = True
self.serial_thread.start()
def _run_serial_handler(self):
while True:
request_item = self.request_queue.get()
query = request_item['query']
response_queue = request_item['response_queue']
try:
# 严格的请求-响应周期
_serial_write(query)
response = _serial_read(8) # 假设读取8字节
response_queue.put({"status": "success", "data": response})
except Exception as e:
response_queue.put({"status": "error", "message": str(e)})
finally:
self.request_queue.task_done()
def send_query(self, query):
# 每个请求创建一个临时的响应队列
response_q = queue.Queue(1)
self.request_queue.put({"query": query, "response_queue": response_q})
# 阻塞等待响应
result = response_q.get()
if result['status'] == 'success':
print(f"[{threading.current_thread().name}] Received response for '{query}': {result['data']}")
return result['data']
else:
print(f"[{threading.current_thread().name}] Error for '{query}': {result['message']}")
raise Exception(result['message'])
# 实例化串口通信抽象层
serial_device_abstraction = SerialCommunicator()
# 示例:线程1持续查询"foo"
def thread1_task():
threading.current_thread().name = "Thread-Foo"
while True:
try:
serial_device_abstraction.send_query("foo")
except Exception as e:
print(f"Thread-Foo encountered error: {e}")
time.sleep(1)
# 示例:线程2随机查询"bar"
def thread2_task():
threading.current_thread().name = "Thread-Bar"
for _ in range(5): # 模拟查询5次
time.sleep(random.uniform(0.1, 2.0))
try:
serial_device_abstraction.send_query("bar")
except Exception as e:
print(f"Thread-Bar encountered error: {e}")
# 启动线程
t1 = threading.Thread(target=thread1_task)
t2 = threading.Thread(target=thread2_task)
t1.start()
t2.start()
# 等待一段时间,观察输出
time.sleep(10)
print("Main thread exiting.")另一种相对直接的方案是使用互斥锁(Mutex)来保护串口的临界区。所有需要访问串口的线程在进行读写操作前,都必须先获取这个互斥锁。
// 假设 serial_fd 是串口的文件描述符
// 假设 serial_mutex 是一个全局或静态的互斥锁
// (在Python中,可以使用threading.Lock)
// C/C++ 伪代码示例
procedure serial_messaging(u8 *request_mesg, int rqlen, u8 *response_mesg, int rslen)
{
int rc;
acquire_the_mutex(serial_mutex); /* 获取互斥锁,否则当前线程阻塞 */
rc = write(serial_fd, request_mesg, rqlen);
if (rc < 0) {
// 处理写入错误
handle_error_condition();
release_the_mutex(serial_mutex);
return;
}
// tcdrain(serial_fd); // 对于一些系统,可能需要确保所有数据已发送,再计时或读取
rc = read(serial_fd, response_mesg, rslen); /* 使用阻塞模式等待响应 */
if (rc < 0) {
// 处理读取错误或超时
handle_error_condition();
release_the_mutex(serial_mutex);
return;
}
release_the_mutex(serial_mutex); /* 释放互斥锁,允许其他线程使用串口 */
return; /* 返回接收到的数据在 response_mesg 中 */
}
// 示例调用 (在不同的线程中)
void thread_foo_task() {
u8 request[] = "foo";
u8 response[8];
while (true) {
serial_messaging(request, sizeof(request), response, sizeof(response));
// 处理接收到的 response
sleep(1);
}
}
void thread_bar_task() {
u8 request[] = "bar";
u8 response[8];
sleep(random_delay());
serial_messaging(request, sizeof(request), response, sizeof(response));
// 处理接收到的 response
}通过采用上述任一高级抽象方案,开发者可以有效地解决多线程环境下串口通信的并发问题,构建出稳定、高效且易于维护的设备通信模块。
以上就是高并发环境下串口通信的高级抽象与实现的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号