
pyads库提供了一种通过ads通知(notification)机制从倍福plc高效获取数据的方式。与轮询不同,通知允许plc在数据发生变化时主动向python应用程序发送更新,这对于需要快速响应(如200ms周期)和处理大量数据(如每周期1000个值,累积形成100000个值的数据序列)的场景至关重要。
然而,在处理通知回调时,开发者常面临一个挑战:如何将回调函数内部接收到的数据安全、高效地传递给主程序或其他处理逻辑,同时避免使用不推荐的全局变量。特别是当需要将多次回调的数据累积成一个大型数组时,这个问题变得尤为突出。Pyads的回调函数是独立的,直接在函数内部处理所有逻辑或依赖全局变量会使代码难以维护和扩展。
解决上述问题的核心在于利用Python的面向对象特性,将Pyads连接、通知设置以及数据处理逻辑封装到一个类中。这种方法不仅能够避免全局变量的使用,还能更好地管理应用程序的状态和数据流。
1. 类结构设计
通过将Pyads连接作为类的一个成员变量,并在类的初始化方法中设置通知,可以将回调函数定义为类的成员方法。这样,回调函数就可以直接访问和修改类的其他成员变量,从而实现数据的累积和状态的更新。
import pyads
import ctypes
import numpy as np # 提前引入,用于后续数据优化
class PlcDataManager:
def __init__(self, ams_net_id, plc_ip, ams_port):
"""
初始化PLC连接和数据管理器。
"""
self.plc = pyads.Connection(ams_net_id, plc_ip, ams_port)
self.plc.open()
self.data_buffer = [] # 用于累积接收到的数据
self.notification_handles = {} # 存储通知句柄,便于后续释放
print(f"PLC connection established: {plc_ip}")
# 示例:初始化心跳计数器
self.heartbeat_count = 0
self.plc_status_message = "PLC is running"
self.setup_notifications()
def setup_notifications(self):
"""
设置ADS通知。
"""
# 示例1:PLC心跳通知
heartbeat_var_name = 'global.heartbeat' # 假设PLC中有一个心跳变量
heartbeat_attr = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL))
handle_heartbeat = self.plc.add_device_notification(
heartbeat_var_name,
heartbeat_attr,
self.on_plc_heartbeat # 回调方法
)
self.notification_handles[heartbeat_var_name] = handle_heartbeat
print(f"Notification set for {heartbeat_var_name}, handle: {handle_heartbeat}")
# 示例2:PLC状态变化通知 (ADSSTATE_RUN, ADSSTATE_CONFIG等)
# ADSIGRP_DEVICE_DATA (0xF100), ADSIOFFS_DEVDATA_ADSSTATE (0x0000) 是特殊的地址用于获取PLC状态
plc_status_addr = (int("0xF100", 16), int("0x0000", 16))
status_attr = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))
handle_status = self.plc.add_device_notification(
plc_status_addr,
status_attr,
self.on_plc_status_change # 回调方法
)
self.notification_handles["PLC_Status"] = handle_status
print(f"Notification set for PLC Status, handle: {handle_status}")
# 示例3:自定义结构体数据通知 (原始问题中的场景)
# 假设PLC中有一个名为 'global.sample_structure' 的变量
structure_def = (
('nVar', pyads.PLCTYPE_DINT, 1000),
('nVar2', pyads.PLCTYPE_DINT, 1000),
('nVar3', pyads.PLCTYPE_DINT, 1000),
('nVar4', pyads.PLCTYPE_DINT, 1000),
('nVar5', pyads.PLCTYPE_DINT, 1000)
)
size_of_struct = pyads.size_of_structure(structure_def)
data_attr = pyads.NotificationAttrib(size_of_struct)
handle_data = self.plc.add_device_notification(
'global.sample_structure',
data_attr,
self.on_data_received # 回调方法
)
self.notification_handles['global.sample_structure'] = handle_data
print(f"Notification set for global.sample_structure, handle: {handle_data}")
self.structure_definition = structure_def # 保存结构体定义供回调使用
def on_plc_heartbeat(self, handle, name, timestamp, value):
"""
PLC心跳通知回调。
"""
# value 默认是 ctypes.c_ubyte * size 的字节数组,对于简单类型可以直接判断
# 或者使用 pyads.dict_from_bytes(value, [(name, pyads.PLCTYPE_BOOL)])
# 这里假设value直接是布尔值对应的字节
is_active = bool(value[0]) # 假设PLCBOOL是单字节
if is_active:
self.heartbeat_count += 1
# print(f"Heartbeat received: {self.heartbeat_count}")
def on_plc_status_change(self, notification, _):
"""
PLC状态变化通知回调。
"""
# 使用parse_notification解析通知数据
*_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)
if value != pyads.ADSSTATE_RUN:
self.plc_status_message = f"PLC exited run mode (State: {value})"
print(self.plc_status_message)
else:
self.plc_status_message = "PLC is running"
# print(self.plc_status_message)
def on_data_received(self, handle, name, timestamp, value):
"""
接收结构体数据通知回调。
"""
# 将字节数据转换为字典
values_dict = pyads.dict_from_bytes(value, self.structure_definition)
# print(f"Received data: {values_dict['nVar'][0]}...") # 打印第一个元素示例
# 将数据添加到缓冲区
# 假设我们只关心 'nVar' 信号的数据,并将其展平
self.data_buffer.extend(values_dict['nVar'])
# 检查是否达到100000个值,并进行处理
if len(self.data_buffer) >= 100000:
print(f"Collected {len(self.data_buffer)} values. Processing...")
# 在这里可以对 data_buffer 进行进一步处理,例如保存到文件、进行分析等
processed_array = np.array(self.data_buffer[:100000]) # 取前100000个
print(f"First 10 values of processed array: {processed_array[:10]}")
self.data_buffer = self.data_buffer[100000:] # 清除已处理的数据,保留剩余部分
def close(self):
"""
关闭PLC连接并移除所有通知。
"""
for var_name, handle in self.notification_handles.items():
try:
self.plc.del_device_notification(handle)
print(f"Removed notification for {var_name}, handle: {handle}")
except Exception as e:
print(f"Error removing notification for {var_name}: {e}")
self.plc.close()
print("PLC connection closed.")
# 示例使用
if __name__ == "__main__":
# 请替换为你的PLC实际连接参数
AMS_NET_ID = '192.168.1.100.1.1' # PLC的AMS Net ID
PLC_IP = '192.168.1.100' # PLC的IP地址
AMS_PORT = 851 # 通常为851
data_manager = PlcDataManager(AMS_NET_ID, PLC_IP, AMS_PORT)
try:
# 在主循环中等待数据或执行其他任务
# Pyads通知通常在后台线程中运行,因此主线程可以保持活跃
while True:
# 可以在这里执行其他非阻塞任务
# 例如:检查数据缓冲区大小,或者每隔一段时间打印心跳计数
# print(f"Current heartbeat: {data_manager.heartbeat_count}, PLC Status: {data_manager.plc_status_message}")
# print(f"Data buffer size: {len(data_manager.data_buffer)}")
# time.sleep(1) # 模拟主程序忙碌
pass # 实际应用中会在这里有更复杂的逻辑
except KeyboardInterrupt:
print("\nExiting program.")
finally:
data_manager.close()2. 关键点:
当处理的PLC数据量巨大且类型一致时,pyads.dict_from_bytes 这样的逐变量转换方法可能会成为性能瓶颈。Pyads允许用户获取原始的 ctypes 字节数据,然后自行进行更高效的转换,例如利用 numpy 库。
1. return_ctypes=True 的应用
在 add_device_notification 或 read_by_name 方法中,可以通过设置 return_ctypes=True 来获取原始的 ctypes 对象,而不是直接解析为Python字典或基本类型。
# 假设 structure_def 和 size_of_struct 已定义
# 修改 add_device_notification 设置,让回调接收原始ctypes数据
handle_data_optimized = self.plc.add_device_notification(
'global.sample_structure',
data_attr, # 仍然使用相同的NotificationAttrib
self.on_data_received_optimized,
return_ctypes=True # 关键在于这里
)
self.notification_handles['global.sample_structure_optimized'] = handle_data_optimized
# 新的回调方法,处理原始ctypes数据
def on_data_received_optimized(self, handle, name, timestamp, value_ctypes):
"""
接收原始ctypes数据通知回调,并使用numpy优化转换。
value_ctypes 是一个 ctypes 数组对象 (e.g., ctypes.c_ubyte * size_of_struct)
"""
# 假设我们知道数据结构是连续的PLCTYPE_DINT数组
# 并且我们只关心第一个变量 'nVar',它是一个1000个DINT的数组
# 获取原始字节数据
buffer = bytearray(value_ctypes)
# 计算 'nVar' 在结构体中的偏移和大小
# 这需要根据 structure_def 手动计算或预先确定
# 假设 nVar 是结构体的第一个元素,且每个DINT是4字节
num_elements = 1000
element_size = ctypes.sizeof(pyads.PLCTYPE_DINT) # 4 bytes for DINT
nvar_byte_length = num_elements * element_size
# 使用numpy从字节缓冲区直接创建数组
# dtype='<i4' 表示 little-endian 32位有符号整数 (DINT)
# frombuffer_copy 可以避免内存视图问题,确保数据被复制
# 这里只读取 nVar 部分的数据
try:
nvar_array = np.frombuffer(buffer[:nvar_byte_length], dtype='<i4')
self.data_buffer.extend(nvar_array.tolist()) # 转换为列表以便extend
# print(f"Optimized received data (first 5): {nvar_array[:5]}")
# 检查是否达到100000个值,并进行处理
if len(self.data_buffer) >= 100000:
print(f"Collected {len(self.data_buffer)} values (optimized). Processing...")
processed_array = np.array(self.data_buffer[:100000])
print(f"First 10 values of processed array (optimized): {processed_array[:10]}")
self.data_buffer = self.data_buffer[100000:]
except ValueError as e:
print(f"Error converting data with numpy: {e}")
# Fallback to slower method or log error
values_dict = pyads.dict_from_bytes(bytearray(value_ctypes), self.structure_definition)
self.data_buffer.extend(values_dict['nVar'])2. numpy 进行数据转换
numpy.frombuffer() 函数能够从一个缓冲区(如 bytearray 或 memoryview)直接解析数据,并以指定的数据类型(dtype)和字节序(endianness)创建 numpy 数组。对于连续存储的同类型数据,这种方法比逐个元素解析快几个数量级。
注意事项:
通过将Pyads通知回调和数据处理逻辑封装到类中,可以有效地管理应用程序的状态,避免全局变量的混乱,并提高代码的可读性和可维护性。对于需要处理大量、高频PLC数据的场景,进一步利用 pyads 的 return_ctypes=True 选项结合 numpy 进行数据转换,能够显著提升数据处理性能。
其他最佳实践:
遵循这些原则,可以构建出高效、稳定且易于维护的Pyads应用程序,有效应对复杂的工业自动化数据采集需求。
以上就是高效管理Pyads通知回调与大规模数据处理的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号