高效管理PyADS通知与大规模数据采集

聖光之護
发布: 2025-08-11 17:22:29
原创
760人浏览过

高效管理pyads通知与大规模数据采集

本文旨在深入探讨如何利用Python的PyADS库高效地从倍福PLC获取实时数据,特别是针对高频、大规模数据采集场景。我们将重点介绍如何通过面向对象的方法(类)来管理回调函数的内部状态和累积数据,从而避免使用全局变量,并提供性能优化策略,尤其是在处理同类型大量数据时,通过自定义字节解析结合NumPy实现数据转换的显著加速。

PyADS通知机制与状态管理挑战

PyADS库通过其通知(Notification)机制,允许开发者订阅PLC变量的变化,并在变量值更新时触发回调函数。这对于需要高速、实时获取数据的应用场景(例如,每200毫秒或更快)至关重要。然而,回调函数的特性决定了它无法直接返回数据,这给需要累积大量数据或管理内部状态的应用带来了挑战。传统的解决方案可能倾向于使用全局变量,但这通常会导致代码难以维护、可读性差,并可能引发并发问题。

对于需要采集大量数据(例如,每周期1000个值,累积15个信号共100,000个值)的应用,直接在回调函数中处理所有数据并累积,同时避免全局变量,是核心问题。

基于类的PyADS通知管理与数据累积

解决回调函数状态管理问题的“Pythonic”方法是采用面向对象编程,将PyADS连接、通知设置以及数据处理逻辑封装在一个类中。这样,回调函数就可以作为类的方法,自然地访问和修改类的实例变量,从而实现数据的累积和状态的维护,而无需依赖全局变量。

类结构示例

以下是一个将PyADS通知封装到类中的基本结构示例:

import pyads
import ctypes
import numpy as np
import time

# 假设的PLC连接参数和变量定义
PLC_IP = '192.168.1.100'
PLC_AMS_NET_ID = '192.168.1.100.1.1'
LOCAL_AMS_NET_ID = '192.168.1.50.1.1' # 根据实际情况配置本地AMS Net ID

# 示例结构体定义,用于从PLC读取多个DINT数组
structure_def = (
    ('nVar1', pyads.PLCTYPE_DINT, 1000),
    ('nVar2', pyads.PLCTYPE_DINT, 1000),
    ('nVar3', pyads.PLCTYPE_DINT, 1000),
    # ... 可以根据需要添加更多变量
)
SIZE_OF_STRUCTURE = pyads.size_of_structure(structure_def)

class PlcDataManager:
    def __init__(self, plc_ip, plc_ams_net_id, local_ams_net_id):
        self.plc = pyads.Connection(plc_ip, plc_ams_net_id, local_ams_net_id)
        self.data_buffer = [] # 用于累积接收到的数据
        self.notification_handles = {} # 存储通知句柄,便于管理
        self.plc_state_run = False # PLC运行状态标记

        try:
            self.plc.open()
            print(f"成功连接到PLC: {plc_ip}")
            self._setup_notifications()
        except pyads.ADSError as e:
            print(f"连接PLC失败: {e}")
            self.plc = None # 标记连接失败

    def _setup_notifications(self):
        """设置所有ADS通知。"""
        if not self.plc:
            return

        # 1. 订阅数据变量通知
        data_var_name = 'global.sample_structure' # 假设PLC中存在此结构体变量
        attr_data = pyads.NotificationAttrib(SIZE_OF_STRUCTURE)
        try:
            handle_data = self.plc.add_device_notification(
                data_var_name, attr_data, self._on_data_received,
                pyads.ADSTransMode.OnChange, 200 # 每200ms检查一次变化
            )
            self.notification_handles[data_var_name] = handle_data
            print(f"已为 '{data_var_name}' 设置数据通知,句柄: {handle_data}")
        except pyads.ADSError as e:
            print(f"设置数据通知失败: {e}")

        # 2. 订阅PLC状态变化通知 (ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE)
        # 这对于监控PLC是否处于运行模式非常有用
        plc_state_addr = (int("0xF100", 16), int("0x0000", 16)) # ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE
        attr_state = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))
        try:
            handle_state = self.plc.add_device_notification(
                plc_state_addr, attr_state, self._on_plc_status_change,
                pyads.ADSTransMode.OnChange, 500 # 每500ms检查一次状态变化
            )
            self.notification_handles['plc_status'] = handle_state
            print(f"已为PLC状态设置通知,句柄: {handle_state}")
        except pyads.ADSError as e:
            print(f"设置PLC状态通知失败: {e}")

    def _on_data_received(self, handle, name, timestamp, value):
        """数据变量通知回调函数。"""
        # 使用pyads.dict_from_bytes进行初始转换,适用于混合类型或小批量数据
        # values = pyads.dict_from_bytes(value, structure_def)
        # self.data_buffer.append(values)

        # 对于大规模同类型数据,推荐手动解析以优化性能
        # 假设所有变量都是PLCTYPE_DINT,且长度相同
        dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT)
        num_elements_per_var = structure_def[0][2] # 假设所有变量长度相同
        total_elements = len(structure_def) * num_elements_per_var

        # 将字节数据直接转换为NumPy数组
        # 注意:这里假设字节序与系统匹配,通常ADS是小端序
        # 如果需要,可以使用 np.frombuffer(value, dtype='<i4') 指定小端序32位整数
        raw_data = np.frombuffer(value, dtype=np.int32) 

        # 将一维NumPy数组重新塑形为字典,方便访问
        current_data = {}
        for i, (var_name, _, var_len) in enumerate(structure_def):
            start_idx = i * var_len
            end_idx = start_idx + var_len
            current_data[var_name] = raw_data[start_idx:end_idx]

        self.data_buffer.append(current_data)
        # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")

    def _on_plc_status_change(self, notification, _):
        """PLC状态变化通知回调函数。"""
        # pyads.parse_notification用于解析原始通知字节数据
        *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)

        if value == pyads.ADSSTATE_RUN:
            self.plc_state_run = True
            print("PLC进入运行模式 (ADSSTATE_RUN)")
        else:
            self.plc_state_run = False
            print(f"PLC退出运行模式,当前状态: {pyads.ADSSTATE_NAMES.get(value, '未知状态')}")
            # 可以触发错误处理或重连逻辑

    def get_accumulated_data(self):
        """获取并清空累积的数据缓冲区。"""
        data = self.data_buffer[:]
        self.data_buffer.clear()
        return data

    def close(self):
        """关闭PLC连接并移除所有通知。"""
        if self.plc:
            for name, handle in self.notification_handles.items():
                try:
                    self.plc.del_device_notification(handle)
                    print(f"已移除通知: {name} (句柄: {handle})")
                except pyads.ADSError as e:
                    print(f"移除通知 '{name}' 失败: {e}")
            self.plc.close()
            print("PLC连接已关闭。")

# 示例使用
if __name__ == "__main__":
    manager = PlcDataManager(PLC_IP, PLC_AMS_NET_ID, LOCAL_AMS_NET_ID)

    if manager.plc: # 确保PLC连接成功
        print("等待数据...")
        try:
            # 模拟主程序循环,等待数据并进行处理
            for i in range(20): # 运行约4秒 (20 * 200ms)
                time.sleep(0.2) # 模拟主程序等待
                if manager.data_buffer:
                    collected_data = manager.get_accumulated_data()
                    print(f"循环 {i+1}: 收集到 {len(collected_data)} 批数据。")
                    # 这里可以对 collected_data 进行进一步处理,例如存储到文件或数据库
                    # print(f"最新一批数据示例: {collected_data[-1]['nVar1'][0:5]}") # 打印第一批数据的nVar1前5个元素
                else:
                    print(f"循环 {i+1}: 暂无新数据。")

        except KeyboardInterrupt:
            print("\n程序被用户中断。")
        finally:
            manager.close()
    else:
        print("无法运行,PLC连接失败。")
登录后复制

在上述示例中:

知我AI
知我AI

一款多端AI知识助理,通过一键生成播客/视频/文档/网页文章摘要、思维导图,提高个人知识获取效率;自动存储知识,通过与知识库聊天,提高知识利用效率。

知我AI26
查看详情 知我AI
  • PlcDataManager 类封装了与PLC的连接、通知的设置以及数据缓冲区 self.data_buffer。
  • _on_data_received 方法作为回调函数,它是一个实例方法,可以直接访问 self.data_buffer 并将接收到的数据追加进去。
  • _on_plc_status_change 方法展示了如何使用 pyads.parse_notification 来解析PLC状态变化的通知,这对于监控PLC运行状态非常有用。
  • get_accumulated_data 方法允许主程序在需要时获取并清空累积的数据,实现数据消费与生产的分离。

高性能数据解析与优化

对于大规模、同类型数据的采集,pyads.dict_from_bytes 或默认的变量读取方式可能会因为Python层面的频繁类型转换而成为性能瓶颈。当PLC发送的数据是连续的字节流,且其中包含大量相同类型的元素(如1000个DINT),更高效的方法是:

  1. 直接获取原始字节数据: 在 add_device_notification 中,回调函数接收到的 value 参数即为原始字节数据。
  2. 利用NumPy进行批量转换: Python的NumPy库在处理数值数组方面具有极高的效率。可以将原始字节数据直接转换为NumPy数组,然后根据结构体定义进行切片和重塑。
# 在 _on_data_received 方法中进行优化
# ...
    def _on_data_received(self, handle, name, timestamp, value):
        """数据变量通知回调函数,优化大规模数据解析。"""
        # 假设所有变量都是PLCTYPE_DINT,且长度相同
        # 例如:structure_def = (('nVar1', pyads.PLCTYPE_DINT, 1000), ...)

        # 将字节数据直接转换为NumPy数组
        # dtype='<i4' 表示小端序32位带符号整数 (DINT)
        # 根据实际PLC的字节序和数据类型调整 dtype
        raw_data_np = np.frombuffer(value, dtype='<i4') 

        current_data = {}
        offset = 0
        for var_name, var_type, var_len in structure_def:
            # 获取每个变量的字节大小
            type_size = ctypes.sizeof(var_type)

            # 计算当前变量在NumPy数组中的元素范围
            start_idx = offset
            end_idx = offset + var_len
            current_data[var_name] = raw_data_np[start_idx:end_idx]
            offset += var_len # 更新偏移量

        self.data_buffer.append(current_data)
        # print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")
# ...
登录后复制

这种方法通过NumPy的底层C实现进行批量数据转换,可以带来数量级的性能提升,尤其适用于数据记录和大数据量处理场景。

注意事项与总结

  • 装饰器限制: pyads.notification 装饰器不能直接用于类方法。必须通过 plc.add_device_notification 方法显式地将类方法注册为回调函数。
  • 字节序与数据类型: 在手动解析字节数据时,务必注意PLC的数据字节序(通常为小端序)以及正确的数据类型映射(例如,PyADS的 PLCTYPE_DINT 对应NumPy的 np.int32 或 np.intc,且需考虑字节序)。
  • 错误处理: 监控PLC的状态变化(如 _on_plc_status_change 所示)对于构建健壮的通信应用至关重要,可以在PLC退出运行模式时触发告警或重连机制。
  • 资源管理: 确保在程序结束时正确关闭PLC连接并移除所有通知,释放资源。

通过采用类封装和高性能的数据解析策略,开发者可以构建出更加健壮、高效且易于维护的PyADS应用程序,从而有效地处理来自PLC的高速、大规模数据流。

以上就是高效管理PyADS通知与大规模数据采集的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号