
在使用pyserial与串口设备通信时,开发者常遇到发送命令后无法读取到预期数据的问题。这通常是由于设备未启用回显模式,而非pyserial连接失败。本文将深入探讨这一常见误区,并提供通过发送特定触发命令、利用`readline()`等方法有效接收串口数据的专业教程,确保稳定可靠的数据交互。
Python的pySerial库是进行串口通信的强大工具,广泛应用于与各种硬件设备(如传感器、仪表、微控制器)进行数据交互。然而,初学者在尝试连接设备并发送命令后,可能会发现即使连接成功,也无法通过ser.in_waiting或ser.read()等方法读取到任何数据,这往往令人困惑。
一个常见的误解是,当通过ser.write()发送数据后,设备会自动将接收到的数据“回显”回来。实际上,大多数串口设备并不会默认开启回显模式。它们通常只在接收到特定的命令后,才会执行操作并发送预定义的数据响应。一些终端工具(如Termite、Minicom)可能内置了“本地回显”功能,这会让人误以为设备在回显,但实际上是终端软件在本地显示了你发送的字符。
因此,如果设备没有被编程为回显,或者没有收到触发其响应的特定命令,ser.in_waiting自然会返回0,因为设备端根本没有发送任何数据回来。
在尝试与设备通信之前,首先需要正确配置并打开串口。以下是一个典型的pySerial串口配置示例:
import serial
import time
def setup_serial_connection(port='COM4', baudrate=9600, timeout=1):
"""
配置并返回一个pySerial串口对象。
:param port: 串口名称,例如'COM4'或'/dev/ttyUSB0'
:param baudrate: 波特率
:param timeout: 读取超时时间(秒)
:return: 配置好的串口对象
"""
ser = serial.Serial()
ser.port = port
ser.baudrate = baudrate
ser.bytesize = serial.EIGHTBITS # 8位数据位
ser.stopbits = serial.STOPBITS_ONE # 1位停止位
ser.parity = serial.PARITY_NONE # 无奇偶校验
ser.xonxoff = False # 禁用软件流控制
ser.rtscts = False # 禁用硬件流控制 (RTS/CTS)
ser.dsrdtr = False # 禁用硬件流控制 (DSR/DTR)
ser.timeout = timeout # 读取超时时间
try:
ser.open()
if ser.is_open:
print(f"成功打开串口: {ser.port}")
return ser
except serial.SerialException as e:
print(f"无法打开串口 {ser.port}: {e}")
return None
return None
# 示例使用
# ser_connection = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)
# if ser_connection:
# # 进行通信
# ser_connection.close()注意事项:
鉴于设备不回显的特性,正确的策略是发送一个明确会触发设备响应的命令,然后等待并读取其响应。
许多设备在接收到特定命令后会返回以换行符(\n)或回车符(\r)结尾的行数据。在这种情况下,ser.readline()是最高效的读取方法。
import serial
import time
# 假设我们已经通过 setup_serial_connection 函数获取了 ser 对象
# 例如:
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)
# 模拟一个已打开的串口对象
class MockSerial:
def __init__(self):
self.is_open = True
self.buffer = b''
self.timeout = 1
self.port = 'COM_MOCK'
def open(self):
self.is_open = True
def close(self):
self.is_open = False
def write(self, data):
print(f"MockSerial: 发送数据: {data.decode('utf-8').strip()}")
# 模拟设备响应:如果收到'K',则返回'0309\n'
if data == b'K':
self.buffer += b'0309\n'
# 模拟其他命令响应
elif data == b'GET_TEMP':
self.buffer += b'25.5C\n'
# 模拟设备处理时间
time.sleep(0.1)
def readline(self):
start_time = time.time()
while b'\n' not in self.buffer:
if self.timeout is not None and (time.time() - start_time > self.timeout):
return b'' # 超时返回空字节串
time.sleep(0.01) # 等待数据
line_end_index = self.buffer.find(b'\n')
if line_end_index != -1:
line = self.buffer[:line_end_index + 1]
self.buffer = self.buffer[line_end_index + 1:]
return line
return b'' # 不应该发生
@property
def in_waiting(self):
return len(self.buffer)
# 替换为实际的串口对象
ser = MockSerial() # 在实际应用中,这里会是 setup_serial_connection() 的返回值
if ser and ser.is_open:
try:
# 发送触发命令。根据设备协议,这可能是'K'、'READ'、'GET_DATA'等
command = b'K' # 假设'K'命令会触发设备返回型号
ser.write(command)
time.sleep(0.1) # 给予设备一点时间处理命令并发送响应
# 检查是否有数据等待,但这不是主要读取方式
buffer_size = ser.in_waiting
print(f"串口等待中的字节数 (在readline之前): {buffer_size}")
# 使用 readline() 读取设备响应
print("开始读取设备响应...")
response_lines = []
while True:
line = ser.readline()
if not line: # 如果readline返回空字节串,表示超时或没有更多数据
break
try:
# 尝试以UTF-8解码,并去除首尾空白符(包括换行符)
decoded_line = line.decode('utf-8').strip()
response_lines.append(decoded_line)
print(f"接收到: {decoded_line}")
except UnicodeDecodeError:
# 如果解码失败,打印原始十六进制数据以供调试
print(f"解码失败,原始数据(hex): {line.hex()}")
except Exception as e:
print(f"处理数据时发生错误: {e}")
break
if response_lines:
print("\n所有接收到的响应:")
for resp in response_lines:
print(resp)
else:
print("未从设备接收到任何响应。请检查命令和设备状态。")
except serial.SerialException as e:
print(f"串口通信错误: {e}")
finally:
if ser.is_open:
ser.close()
print("串口已关闭。")
else:
print("串口未成功打开,无法进行通信。")
代码解析:
如果设备不以行结尾符响应,或者你需要读取固定长度的数据块,可以使用ser.read(size)。
# ... (串口初始化代码同上) ...
# 假设 ser 已经是一个打开的串口对象
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)
# 使用 MockSerial 模拟
ser = MockSerial()
ser.timeout = 0.5 # 确保read有超时
if ser and ser.is_open:
try:
command = b'GET_DATA_BLOCK' # 假设这个命令会触发设备发送一个固定长度的数据块
ser.write(command)
time.sleep(0.1)
# 假设我们知道设备会返回10个字节的数据
expected_data_length = 10
received_data = b''
# 循环读取直到达到预期长度或超时
while len(received_data) < expected_data_length:
chunk = ser.read(expected_data_length - len(received_data))
if not chunk: # 超时或没有更多数据
break
received_data += chunk
print(f"已接收 {len(received_data)} / {expected_data_length} 字节")
if received_data:
print(f"\n接收到的原始数据 (字节): {received_data}")
try:
print(f"解码后的数据: {received_data.decode('ascii').strip()}")
except UnicodeDecodeError:
print(f"解码失败,原始数据(hex): {received_data.hex()}")
else:
print("未从设备接收到任何数据。")
except serial.SerialException as e:
print(f"串口通信错误: {e}")
finally:
if ser.is_open:
ser.close()
print("串口已关闭。")ser.read(size)的特点:
如果设备持续发送数据,你可能需要一个循环来不断读取。
# ... (串口初始化代码同上) ...
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=0.1) # 较短的超时
# 使用 MockSerial 模拟,模拟持续发送数据
class ContinuousMockSerial(MockSerial):
def __init__(self):
super().__init__()
self.counter = 0
def write(self, data):
print(f"MockSerial: 发送数据: {data.decode('utf-8').strip()}")
# 模拟设备收到'START'后开始持续发送数据
if data == b'START':
self.start_time = time.time()
self.sending = True
elif data == b'STOP':
self.sending = False
def readline(self):
if hasattr(self, 'sending') and self.sending and (time.time() - self.start_time) > 0.1:
self.buffer += f"Data_Point_{self.counter}\n".encode('utf-8')
self.counter += 1
self.start_time = time.time() # 重置发送时间
return super().readline()
ser = ContinuousMockSerial()
ser.timeout = 0.5 # 确保readline有超时
if ser and ser.is_open:
print("开始监听连续数据...")
try:
ser.write(b'START') # 告知设备开始发送数据
for i in range(10): # 循环读取10次
line = ser.readline()
if line:
try:
print(f"接收到: {line.decode('utf-8').strip()}")
except UnicodeDecodeError:
print(f"解码失败,原始数据(hex): {line.hex()}")
else:
print("未接收到数据,可能设备停止发送或超时。")
time.sleep(0.2) # 模拟处理间隔
ser.write(b'STOP') # 告知设备停止发送数据
print("停止监听。")
except serial.SerialException as e:
print(f"串口通信错误: {e}")
finally:
if ser.is_open:
ser.close()
print("串口已关闭。")遵循这些原则,你将能更有效地使用pySerial与各种串口设备进行稳定可靠的通信。
以上就是使用pySerial进行串口通信:数据接收的常见陷阱与最佳实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号