
PyADS库通过其通知(Notification)机制,允许开发者订阅PLC变量的变化,并在变量值更新时触发回调函数。这对于需要高速、实时获取数据的应用场景(例如,每200毫秒或更快)至关重要。然而,回调函数的特性决定了它无法直接返回数据,这给需要累积大量数据或管理内部状态的应用带来了挑战。传统的解决方案可能倾向于使用全局变量,但这通常会导致代码难以维护、可读性差,并可能引发并发问题。
对于需要采集大量数据(例如,每周期1000个值,累积15个信号共100,000个值)的应用,直接在回调函数中处理所有数据并累积,同时避免全局变量,是核心问题。
解决回调函数状态管理问题的“Pythonic”方法是采用面向对象编程,将PyADS连接、通知设置以及数据处理逻辑封装在一个类中。这样,回调函数就可以作为类的方法,自然地访问和修改类的实例变量,从而实现数据的累积和状态的维护,而无需依赖全局变量。
以下是一个将PyADS通知封装到类中的基本结构示例:
import pyads
import ctypes
import numpy as np
import time
# 假设的PLC连接参数和变量定义
PLC_IP = '192.168.1.100'
PLC_AMS_NET_ID = '192.168.1.100.1.1'
LOCAL_AMS_NET_ID = '192.168.1.50.1.1' # 根据实际情况配置本地AMS Net ID
# 示例结构体定义,用于从PLC读取多个DINT数组
structure_def = (
('nVar1', pyads.PLCTYPE_DINT, 1000),
('nVar2', pyads.PLCTYPE_DINT, 1000),
('nVar3', pyads.PLCTYPE_DINT, 1000),
# ... 可以根据需要添加更多变量
)
SIZE_OF_STRUCTURE = pyads.size_of_structure(structure_def)
class PlcDataManager:
def __init__(self, plc_ip, plc_ams_net_id, local_ams_net_id):
self.plc = pyads.Connection(plc_ip, plc_ams_net_id, local_ams_net_id)
self.data_buffer = [] # 用于累积接收到的数据
self.notification_handles = {} # 存储通知句柄,便于管理
self.plc_state_run = False # PLC运行状态标记
try:
self.plc.open()
print(f"成功连接到PLC: {plc_ip}")
self._setup_notifications()
except pyads.ADSError as e:
print(f"连接PLC失败: {e}")
self.plc = None # 标记连接失败
def _setup_notifications(self):
"""设置所有ADS通知。"""
if not self.plc:
return
# 1. 订阅数据变量通知
data_var_name = 'global.sample_structure' # 假设PLC中存在此结构体变量
attr_data = pyads.NotificationAttrib(SIZE_OF_STRUCTURE)
try:
handle_data = self.plc.add_device_notification(
data_var_name, attr_data, self._on_data_received,
pyads.ADSTransMode.OnChange, 200 # 每200ms检查一次变化
)
self.notification_handles[data_var_name] = handle_data
print(f"已为 '{data_var_name}' 设置数据通知,句柄: {handle_data}")
except pyads.ADSError as e:
print(f"设置数据通知失败: {e}")
# 2. 订阅PLC状态变化通知 (ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE)
# 这对于监控PLC是否处于运行模式非常有用
plc_state_addr = (int("0xF100", 16), int("0x0000", 16)) # ADSIGRP_DEVICE_DATA, ADSIOFFS_DEVDATA_ADSSTATE
attr_state = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))
try:
handle_state = self.plc.add_device_notification(
plc_state_addr, attr_state, self._on_plc_status_change,
pyads.ADSTransMode.OnChange, 500 # 每500ms检查一次状态变化
)
self.notification_handles['plc_status'] = handle_state
print(f"已为PLC状态设置通知,句柄: {handle_state}")
except pyads.ADSError as e:
print(f"设置PLC状态通知失败: {e}")
def _on_data_received(self, handle, name, timestamp, value):
"""数据变量通知回调函数。"""
# 使用pyads.dict_from_bytes进行初始转换,适用于混合类型或小批量数据
# values = pyads.dict_from_bytes(value, structure_def)
# self.data_buffer.append(values)
# 对于大规模同类型数据,推荐手动解析以优化性能
# 假设所有变量都是PLCTYPE_DINT,且长度相同
dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT)
num_elements_per_var = structure_def[0][2] # 假设所有变量长度相同
total_elements = len(structure_def) * num_elements_per_var
# 将字节数据直接转换为NumPy数组
# 注意:这里假设字节序与系统匹配,通常ADS是小端序
# 如果需要,可以使用 np.frombuffer(value, dtype='<i4') 指定小端序32位整数
raw_data = np.frombuffer(value, dtype=np.int32)
# 将一维NumPy数组重新塑形为字典,方便访问
current_data = {}
for i, (var_name, _, var_len) in enumerate(structure_def):
start_idx = i * var_len
end_idx = start_idx + var_len
current_data[var_name] = raw_data[start_idx:end_idx]
self.data_buffer.append(current_data)
# print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")
def _on_plc_status_change(self, notification, _):
"""PLC状态变化通知回调函数。"""
# pyads.parse_notification用于解析原始通知字节数据
*_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)
if value == pyads.ADSSTATE_RUN:
self.plc_state_run = True
print("PLC进入运行模式 (ADSSTATE_RUN)")
else:
self.plc_state_run = False
print(f"PLC退出运行模式,当前状态: {pyads.ADSSTATE_NAMES.get(value, '未知状态')}")
# 可以触发错误处理或重连逻辑
def get_accumulated_data(self):
"""获取并清空累积的数据缓冲区。"""
data = self.data_buffer[:]
self.data_buffer.clear()
return data
def close(self):
"""关闭PLC连接并移除所有通知。"""
if self.plc:
for name, handle in self.notification_handles.items():
try:
self.plc.del_device_notification(handle)
print(f"已移除通知: {name} (句柄: {handle})")
except pyads.ADSError as e:
print(f"移除通知 '{name}' 失败: {e}")
self.plc.close()
print("PLC连接已关闭。")
# 示例使用
if __name__ == "__main__":
manager = PlcDataManager(PLC_IP, PLC_AMS_NET_ID, LOCAL_AMS_NET_ID)
if manager.plc: # 确保PLC连接成功
print("等待数据...")
try:
# 模拟主程序循环,等待数据并进行处理
for i in range(20): # 运行约4秒 (20 * 200ms)
time.sleep(0.2) # 模拟主程序等待
if manager.data_buffer:
collected_data = manager.get_accumulated_data()
print(f"循环 {i+1}: 收集到 {len(collected_data)} 批数据。")
# 这里可以对 collected_data 进行进一步处理,例如存储到文件或数据库
# print(f"最新一批数据示例: {collected_data[-1]['nVar1'][0:5]}") # 打印第一批数据的nVar1前5个元素
else:
print(f"循环 {i+1}: 暂无新数据。")
except KeyboardInterrupt:
print("\n程序被用户中断。")
finally:
manager.close()
else:
print("无法运行,PLC连接失败。")在上述示例中:
对于大规模、同类型数据的采集,pyads.dict_from_bytes 或默认的变量读取方式可能会因为Python层面的频繁类型转换而成为性能瓶颈。当PLC发送的数据是连续的字节流,且其中包含大量相同类型的元素(如1000个DINT),更高效的方法是:
# 在 _on_data_received 方法中进行优化
# ...
def _on_data_received(self, handle, name, timestamp, value):
"""数据变量通知回调函数,优化大规模数据解析。"""
# 假设所有变量都是PLCTYPE_DINT,且长度相同
# 例如:structure_def = (('nVar1', pyads.PLCTYPE_DINT, 1000), ...)
# 将字节数据直接转换为NumPy数组
# dtype='<i4' 表示小端序32位带符号整数 (DINT)
# 根据实际PLC的字节序和数据类型调整 dtype
raw_data_np = np.frombuffer(value, dtype='<i4')
current_data = {}
offset = 0
for var_name, var_type, var_len in structure_def:
# 获取每个变量的字节大小
type_size = ctypes.sizeof(var_type)
# 计算当前变量在NumPy数组中的元素范围
start_idx = offset
end_idx = offset + var_len
current_data[var_name] = raw_data_np[start_idx:end_idx]
offset += var_len # 更新偏移量
self.data_buffer.append(current_data)
# print(f"接收到数据,当前缓冲区大小: {len(self.data_buffer)}")
# ...这种方法通过NumPy的底层C实现进行批量数据转换,可以带来数量级的性能提升,尤其适用于数据记录和大数据量处理场景。
通过采用类封装和高性能的数据解析策略,开发者可以构建出更加健壮、高效且易于维护的PyADS应用程序,从而有效地处理来自PLC的高速、大规模数据流。
以上就是高效管理PyADS通知与大规模数据采集的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号