使用pySerial进行串口通信:数据接收的常见陷阱与最佳实践

花韻仙語
发布: 2025-10-27 12:41:03
原创
189人浏览过

使用pySerial进行串口通信:数据接收的常见陷阱与最佳实践

在使用pyserial与串口设备通信时,开发者常遇到发送命令后无法读取到预期数据的问题。这通常是由于设备未启用回显模式,而非pyserial连接失败。本文将深入探讨这一常见误区,并提供通过发送特定触发命令、利用`readline()`等方法有效接收串口数据的专业教程,确保稳定可靠的数据交互。

pySerial串口通信基础与数据接收挑战

Python的pySerial库是进行串口通信的强大工具,广泛应用于与各种硬件设备(如传感器、仪表、微控制器)进行数据交互。然而,初学者在尝试连接设备并发送命令后,可能会发现即使连接成功,也无法通过ser.in_waiting或ser.read()等方法读取到任何数据,这往往令人困惑。

一个常见的误解是,当通过ser.write()发送数据后,设备会自动将接收到的数据“回显”回来。实际上,大多数串口设备并不会默认开启回显模式。它们通常只在接收到特定的命令后,才会执行操作并发送预定义的数据响应。一些终端工具(如Termite、Minicom)可能内置了“本地回显”功能,这会让人误以为设备在回显,但实际上是终端软件在本地显示了你发送的字符。

因此,如果设备没有被编程为回显,或者没有收到触发其响应的特定命令,ser.in_waiting自然会返回0,因为设备端根本没有发送任何数据回来。

建立pySerial连接

在尝试与设备通信之前,首先需要正确配置并打开串口。以下是一个典型的pySerial串口配置示例:

import serial
import time

def setup_serial_connection(port='COM4', baudrate=9600, timeout=1):
    """
    配置并返回一个pySerial串口对象。
    :param port: 串口名称,例如'COM4'或'/dev/ttyUSB0'
    :param baudrate: 波特率
    :param timeout: 读取超时时间(秒)
    :return: 配置好的串口对象
    """
    ser = serial.Serial()
    ser.port = port
    ser.baudrate = baudrate
    ser.bytesize = serial.EIGHTBITS  # 8位数据位
    ser.stopbits = serial.STOPBITS_ONE # 1位停止位
    ser.parity = serial.PARITY_NONE  # 无奇偶校验
    ser.xonxoff = False              # 禁用软件流控制
    ser.rtscts = False               # 禁用硬件流控制 (RTS/CTS)
    ser.dsrdtr = False               # 禁用硬件流控制 (DSR/DTR)
    ser.timeout = timeout            # 读取超时时间

    try:
        ser.open()
        if ser.is_open:
            print(f"成功打开串口: {ser.port}")
            return ser
    except serial.SerialException as e:
        print(f"无法打开串口 {ser.port}: {e}")
        return None

    return None

# 示例使用
# ser_connection = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)
# if ser_connection:
#     # 进行通信
#     ser_connection.close()
登录后复制

注意事项:

  • port:根据您的操作系统设备管理器确定正确的串口号(Windows通常是COMx,Linux通常是/dev/ttyUSBx或/dev/ttySx)。
  • baudrate、bytesize、stopbits、parity:这些参数必须与您的设备制造商提供的串口通信协议完全匹配。
  • rtscts、dsrdtr、xonxoff:这些是流控制设置。根据设备要求启用或禁用,通常默认禁用。如果设备需要硬件流控制,应将rtscts或dsrdtr设置为True。
  • timeout:这是一个关键参数。它定义了read()或readline()操作等待数据的时间。如果设置为0,则非阻塞,立即返回。如果设置为None,则无限等待。对于大多数应用,设置一个合理的非零超时值是推荐的做法。

有效的数据接收策略

鉴于设备不回显的特性,正确的策略是发送一个明确会触发设备响应的命令,然后等待并读取其响应。

1. 发送触发命令并使用 readline() 读取

许多设备在接收到特定命令后会返回以换行符(\n)或回车符(\r)结尾的行数据。在这种情况下,ser.readline()是最高效的读取方法。

先见AI
先见AI

数据为基,先见未见

先见AI95
查看详情 先见AI
import serial
import time

# 假设我们已经通过 setup_serial_connection 函数获取了 ser 对象
# 例如:
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)

# 模拟一个已打开的串口对象
class MockSerial:
    def __init__(self):
        self.is_open = True
        self.buffer = b''
        self.timeout = 1
        self.port = 'COM_MOCK'

    def open(self):
        self.is_open = True
    def close(self):
        self.is_open = False
    def write(self, data):
        print(f"MockSerial: 发送数据: {data.decode('utf-8').strip()}")
        # 模拟设备响应:如果收到'K',则返回'0309\n'
        if data == b'K':
            self.buffer += b'0309\n'
        # 模拟其他命令响应
        elif data == b'GET_TEMP':
            self.buffer += b'25.5C\n'
        # 模拟设备处理时间
        time.sleep(0.1)

    def readline(self):
        start_time = time.time()
        while b'\n' not in self.buffer:
            if self.timeout is not None and (time.time() - start_time > self.timeout):
                return b'' # 超时返回空字节串
            time.sleep(0.01) # 等待数据

        line_end_index = self.buffer.find(b'\n')
        if line_end_index != -1:
            line = self.buffer[:line_end_index + 1]
            self.buffer = self.buffer[line_end_index + 1:]
            return line
        return b'' # 不应该发生

    @property
    def in_waiting(self):
        return len(self.buffer)

# 替换为实际的串口对象
ser = MockSerial() # 在实际应用中,这里会是 setup_serial_connection() 的返回值

if ser and ser.is_open:
    try:
        # 发送触发命令。根据设备协议,这可能是'K'、'READ'、'GET_DATA'等
        command = b'K' # 假设'K'命令会触发设备返回型号
        ser.write(command)
        time.sleep(0.1) # 给予设备一点时间处理命令并发送响应

        # 检查是否有数据等待,但这不是主要读取方式
        buffer_size = ser.in_waiting
        print(f"串口等待中的字节数 (在readline之前): {buffer_size}")

        # 使用 readline() 读取设备响应
        print("开始读取设备响应...")
        response_lines = []
        while True:
            line = ser.readline()
            if not line: # 如果readline返回空字节串,表示超时或没有更多数据
                break
            try:
                # 尝试以UTF-8解码,并去除首尾空白符(包括换行符)
                decoded_line = line.decode('utf-8').strip()
                response_lines.append(decoded_line)
                print(f"接收到: {decoded_line}")
            except UnicodeDecodeError:
                # 如果解码失败,打印原始十六进制数据以供调试
                print(f"解码失败,原始数据(hex): {line.hex()}")
            except Exception as e:
                print(f"处理数据时发生错误: {e}")
                break

        if response_lines:
            print("\n所有接收到的响应:")
            for resp in response_lines:
                print(resp)
        else:
            print("未从设备接收到任何响应。请检查命令和设备状态。")

    except serial.SerialException as e:
        print(f"串口通信错误: {e}")
    finally:
        if ser.is_open:
            ser.close()
            print("串口已关闭。")
else:
    print("串口未成功打开,无法进行通信。")
登录后复制

代码解析:

  • ser.write(command): 发送编码后的字节串命令。
  • time.sleep(0.1): 在发送命令后短暂暂停,给设备留出处理命令和准备响应的时间。这个延迟时间需要根据设备的响应速度进行调整。
  • ser.readline(): 尝试从串口读取一行数据,直到遇到换行符\n或超时。
  • if not line: break: 如果readline()在超时时间内没有读到任何数据,它会返回一个空字节串b'',此时应退出循环。
  • line.decode('utf-8').strip(): 将接收到的字节串解码为字符串,并移除前后的空白符(包括\n)。如果设备使用不同的编码,需要修改'utf-8'。
  • UnicodeDecodeError处理:如果解码失败,说明接收到的数据可能不是预期的文本格式。此时打印其十六进制表示有助于调试。

2. 使用 read() 读取特定长度或所有可用数据

如果设备不以行结尾符响应,或者你需要读取固定长度的数据块,可以使用ser.read(size)。

# ... (串口初始化代码同上) ...

# 假设 ser 已经是一个打开的串口对象
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)

# 使用 MockSerial 模拟
ser = MockSerial()
ser.timeout = 0.5 # 确保read有超时

if ser and ser.is_open:
    try:
        command = b'GET_DATA_BLOCK' # 假设这个命令会触发设备发送一个固定长度的数据块
        ser.write(command)
        time.sleep(0.1)

        # 假设我们知道设备会返回10个字节的数据
        expected_data_length = 10
        received_data = b''

        # 循环读取直到达到预期长度或超时
        while len(received_data) < expected_data_length:
            chunk = ser.read(expected_data_length - len(received_data))
            if not chunk: # 超时或没有更多数据
                break
            received_data += chunk
            print(f"已接收 {len(received_data)} / {expected_data_length} 字节")

        if received_data:
            print(f"\n接收到的原始数据 (字节): {received_data}")
            try:
                print(f"解码后的数据: {received_data.decode('ascii').strip()}")
            except UnicodeDecodeError:
                print(f"解码失败,原始数据(hex): {received_data.hex()}")
        else:
            print("未从设备接收到任何数据。")

    except serial.SerialException as e:
        print(f"串口通信错误: {e}")
    finally:
        if ser.is_open:
            ser.close()
            print("串口已关闭。")
登录后复制

ser.read(size)的特点:

  • size:指定要读取的最大字节数。
  • 如果size个字节在超时时间内未完全读取到,read()会返回所有已读取到的字节。
  • 如果超时时间内没有数据可用,read()会返回空字节串b''。

3. 处理连续数据流

如果设备持续发送数据,你可能需要一个循环来不断读取。

# ... (串口初始化代码同上) ...
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=0.1) # 较短的超时

# 使用 MockSerial 模拟,模拟持续发送数据
class ContinuousMockSerial(MockSerial):
    def __init__(self):
        super().__init__()
        self.counter = 0
    def write(self, data):
        print(f"MockSerial: 发送数据: {data.decode('utf-8').strip()}")
        # 模拟设备收到'START'后开始持续发送数据
        if data == b'START':
            self.start_time = time.time()
            self.sending = True
        elif data == b'STOP':
            self.sending = False
    def readline(self):
        if hasattr(self, 'sending') and self.sending and (time.time() - self.start_time) > 0.1:
            self.buffer += f"Data_Point_{self.counter}\n".encode('utf-8')
            self.counter += 1
            self.start_time = time.time() # 重置发送时间
        return super().readline()

ser = ContinuousMockSerial()
ser.timeout = 0.5 # 确保readline有超时

if ser and ser.is_open:
    print("开始监听连续数据...")
    try:
        ser.write(b'START') # 告知设备开始发送数据
        for i in range(10): # 循环读取10次
            line = ser.readline()
            if line:
                try:
                    print(f"接收到: {line.decode('utf-8').strip()}")
                except UnicodeDecodeError:
                    print(f"解码失败,原始数据(hex): {line.hex()}")
            else:
                print("未接收到数据,可能设备停止发送或超时。")
            time.sleep(0.2) # 模拟处理间隔

        ser.write(b'STOP') # 告知设备停止发送数据
        print("停止监听。")

    except serial.SerialException as e:
        print(f"串口通信错误: {e}")
    finally:
        if ser.is_open:
            ser.close()
            print("串口已关闭。")
登录后复制

总结与最佳实践

  1. 理解设备协议: 这是最重要的。务必查阅设备制造商提供的通信协议文档,了解其命令格式、响应格式、波特率、数据位、停止位、奇偶校验和流控制设置。
  2. 区分本地回显与设备响应: 终端工具的“本地回显”功能可能会误导你。pySerial不会自动回显你发送的数据。
  3. 发送触发命令: 只有发送了设备能识别并会触发响应的命令,设备才会发送数据。
  4. 合理设置timeout: ser.timeout是控制读取操作阻塞时间的关键。设置为None会导致无限等待,设置为0会立即返回。对于大多数应用,建议设置一个非零的合理值。
  5. 选择合适的读取方法:
    • ser.readline():适用于设备以行(\n或\r结尾)发送数据的情况。
    • ser.read(size):适用于读取固定长度的数据块或任意字节流。
    • ser.read_until(expected=b'\n', size=None):适用于读取直到遇到特定终止符。
  6. 处理编码和解码: 串口通信通常涉及字节串。发送时,字符串需要.encode('utf-8')(或其他编码)转换为字节串;接收时,字节串需要.decode('utf-8')转换为字符串。
  7. 异常处理: 使用try...except serial.SerialException来捕获串口相关的错误,并确保在finally块中关闭串口ser.close()。
  8. 考虑流控制: 如果设备需要硬件或软件流控制,请确保在pySerial中正确配置rtscts、dsrdtr或xonxoff。

遵循这些原则,你将能更有效地使用pySerial与各种串口设备进行稳定可靠的通信。

以上就是使用pySerial进行串口通信:数据接收的常见陷阱与最佳实践的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号