
pyads库提供了强大的功能,允许Python程序与倍福(Beckhoff)PLC进行ADS通信。其中,ADS通知(Notification)机制是实现实时数据采集的关键。通过注册通知,PLC变量的值一旦发生变化,pyads就会立即触发相应的回调函数,从而避免了轮询(Polling)带来的延迟和资源浪费。
然而,在处理高频、大数据量的PLC变量通知时,开发者常会遇到一些挑战:
解决这些问题的核心在于采用基于类的设计模式,并结合高效的数据解析策略。
为了避免全局变量并更好地管理状态,推荐将pyads连接、通知注册以及回调函数封装在一个类中。这种方法提供了更好的封装性、模块化和可维护性。
核心思想:
示例:基础通知管理类
以下是一个简单的示例,展示如何在一个类中管理pyads连接和注册通知,以监控一个PLC心跳变量:
import pyads
import ctypes
import time
# 假设PLC连接参数和变量名
PLC_AMS_NET_ID = '192.168.1.100.1.1' # 替换为你的PLC AMS Net ID
PLC_AMS_PORT = 851 # 替换为你的PLC AMS Port
VAR_NAME_HEARTBEAT = 'GVL.heartbeat' # 假设PLC中有一个布尔型心跳变量
class PlcDataManager:
def __init__(self, plc_ams_net_id, plc_ams_port):
"""
初始化PLC连接和数据管理器。
"""
self.plc = pyads.Connection(plc_ams_net_id, plc_ams_port)
self.heartbeat_count = 0
self.data_buffer = [] # 用于累积数据的缓冲区
self._is_connected = False
try:
self.plc.open()
self._is_connected = True
print(f"成功连接到PLC: {plc_ams_net_id}:{plc_ams_port}")
self.setup_notifications()
except pyads.ADSError as e:
print(f"连接PLC失败或设置通知失败: {e}")
self.plc = None # 确保连接失败时plc对象为None
def setup_notifications(self):
"""
设置所有ADS通知。
"""
if not self._is_connected:
print("PLC未连接,无法设置通知。")
return
# 注册心跳变量的通知
# 注意:这里不能使用 @plc.notification 装饰器,而是使用 add_device_notification 方法
self.plc.add_device_notification(
VAR_NAME_HEARTBEAT,
pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL)),
self.on_plc_heartbeat, # 将类的成员方法作为回调函数
)
print(f"已注册通知: {VAR_NAME_HEARTBEAT}")
# 示例:注册一个结构体变量的通知 (与原始问题类似)
# 假设PLC中有一个名为 'global.sample_structure' 的结构体
# structure_def = (
# ('nVar', pyads.PLCTYPE_DINT, 1000),
# ('nVar2', pyads.PLCTYPE_DINT, 1000),
# )
# size_of_struct = pyads.size_of_structure(structure_def)
# self.plc.add_device_notification(
# 'global.sample_structure',
# pyads.NotificationAttrib(size_of_struct),
# lambda handle, name, timestamp, value: self.on_sample_structure_data(handle, name, timestamp, value, structure_def)
# )
# print(f"已注册通知: global.sample_structure")
def on_plc_heartbeat(self, handle, name, timestamp, value):
"""
心跳变量通知回调函数。
"""
# value 是 ctypes.c_ubyte 数组,需要解析
# 对于布尔类型,通常是单个字节
bool_value = bool(value[0])
self.heartbeat_count += 1
print(f"[{time.time():.3f}] 心跳计数: {self.heartbeat_count}, 值: {bool_value}")
# 在这里可以进行数据处理或累积
self.data_buffer.append(bool_value)
# def on_sample_structure_data(self, handle, name, timestamp, value, structure_def):
# """
# 结构体数据通知回调函数。
# """
# # 将字节数据解析为Python字典
# values = pyads.dict_from_bytes(value, structure_def)
# print(f"[{time.time():.3f}] 接收到结构体数据: {name}, 值: {values}")
# # 将数据添加到类的缓冲区
# self.data_buffer.extend(values['nVar']) # 假设只关心nVar
def close(self):
"""
关闭PLC连接。
"""
if self._is_connected and self.plc:
self.plc.close()
print("PLC连接已关闭。")
self._is_connected = False
# 示例使用
if __name__ == "__main__":
# 替换为你的PLC连接参数
manager = PlcDataManager(PLC_AMS_NET_ID, PLC_AMS_PORT)
if manager._is_connected:
try:
# 保持主线程运行,等待通知回调
# pyads.Connection.run() 会阻塞并处理所有通知
# 或者在一个循环中执行其他任务并等待通知
print("\n等待PLC通知 (按 Ctrl+C 退出)...")
pyads.Connection.run() # 这会阻塞并处理所有已注册的通知
except KeyboardInterrupt:
print("\n程序终止。")
finally:
manager.close()
print(f"总心跳次数: {manager.heartbeat_count}")
# print(f"累积数据量: {len(manager.data_buffer)}")注意事项:
除了用户自定义的变量,pyads还允许监听PLC的内部状态变化,例如PLC从运行模式(Run Mode)切换到配置模式(Config Mode)。这对于诊断通信问题和确保系统稳定性至关重要。
PLC状态通知通过特定的ADS组(Group)和偏移量(Offset)来注册:
以下是如何在类中实现PLC状态变化的监控:
# ... PlcDataManager 类定义不变 ...
class PlcDataManager:
# ... __init__ 和 setup_notifications 方法的其余部分 ...
def setup_notifications(self):
# ... 现有通知设置 ...
# 注册PLC状态变化的通知
self.plc.add_device_notification(
(pyads.ADSIGRP_DEVICE_DATA, pyads.ADSIOFFS_DEVDATA_ADSSTATE), # ADS组和偏移量
pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT)), # PLC状态通常是INT类型
self._on_plc_status_change,
)
print("已注册PLC状态变化通知。")
def _on_plc_status_change(self, notification, _):
"""
PLC状态变化通知回调函数。
当PLC状态(如从运行到配置)改变时触发。
"""
# 使用 parse_notification 解析通知数据
# parse_notification 会返回 handle, name, timestamp, value
# 我们只关心 value
*_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)
print(f"[{time.time():.3f}] PLC状态改变为: {value} ({pyads.ADSSTATE_NAMES.get(value, '未知状态')})")
if value != pyads.ADSSTATE_RUN:
print("警告: PLC已退出运行模式!")
# 在这里可以触发错误处理、日志记录或系统报警
self._plc_fatal_com_error_message = "PLC exited run mode"对于原始问题中提到的,每个周期需要处理1000个值,总计累积100,000个值的场景,默认的pyads.dict_from_bytes方法可能会因为其逐个变量的解析机制而效率不高。为了提高性能,尤其是在数据类型统一的情况下,推荐以下优化策略:
示例:高效解析结构体字节数据
假设你有一个包含1000个DINT类型变量的结构体:
import numpy as np
# ... PlcDataManager 类定义 ...
class PlcDataManager:
# ... 其他方法 ...
# 假设你的结构体定义如下
STRUCTURE_DEF_LARGE_ARRAY = (
('data_array', pyads.PLCTYPE_DINT, 1000), # 1000个DINT
# ... 其他变量 ...
)
SIZE_OF_LARGE_STRUCT = pyads.size_of_structure(STRUCTURE_DEF_LARGE_ARRAY)
def setup_notifications(self):
# ... 现有通知设置 ...
# 注册大型结构体通知
self.plc.add_device_notification(
'global.large_data_structure', # 假设PLC中这个结构体的名称
pyads.NotificationAttrib(self.SIZE_OF_LARGE_STRUCT),
self.on_large_data_notification,
)
print("已注册大型数据结构体通知。")
def on_large_data_notification(self, handle, name, timestamp, value):
"""
大型数据结构体通知回调函数,使用NumPy优化解析。
"""
# value 是 ctypes.c_ubyte * SIZE_OF_LARGE_STRUCT 类型
# 可以直接将其转换为bytes
byte_data = bytes(value)
# 假设结构体中只有一个名为 'data_array' 的DINT数组
# DINT是32位整数 (4字节)
dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT)
num_elements = 1000
expected_bytes = dint_size * num_elements
if len(byte_data) >= expected_bytes:
# 使用np.frombuffer直接从字节缓冲区创建NumPy数组
# dtype='<i4' 表示小端32位有符号整数 (DINT)
# 如果是无符号DINT (UDINT),则使用 '<u4'
# offset 参数用于跳过结构体中其他变量的字节
# 这里假设 'data_array' 是结构体的第一个成员
np_array = np.frombuffer(byte_data, dtype='<i4', count=num_elements, offset=0)
# 可以在这里对 np_array 进行进一步处理或累积
self.data_buffer.extend(np_array.tolist()) # 将NumPy数组转换为Python列表并累积
print(f"[{time.time():.3f}] 接收到大型数据: {name}, 数组前5个值: {np_array[:5]}, 累积数据量: {len(self.data_buffer)}")
else:
print(f"[{time.time():.3f}] 警告: 接收到的字节数据长度不匹配预期。")
NumPy解析注意事项:
通过采用基于类的设计模式,我们能够以更清晰、更可维护的方式管理pyads的PLC通信和通知回调。这种方法有效地解决了全局变量的依赖问题,并使得数据在类实例内部得以安全地累积和处理。
对于高吞吐量的数据采集场景,仅仅依赖回调机制是不够的,还需要关注数据解析的效率。利用NumPy等科学计算库直接处理从pyads通知中获取的原始字节数据,可以显著提升数据转换和处理的性能。
最佳实践建议:
遵循这些原则,开发者可以构建出健壮、高效且易于维护的pyads应用程序,以满足严苛的工业数据采集需求。
以上就是pyads通知机制的高效数据处理:基于类的设计与优化实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号