
在物理实验或工业监控等场景中,实时图像采集和处理是核心环节。然而,当数据采集速率较高(例如2.5hz)时,系统效率就变得至关重要。原始代码在处理静态数据时表现良好,但在实时图像不断写入指定文件夹的动态场景下,却出现了亮度值计算错误的问题,且整体性能不尽如人意。这通常是由于代码结构混乱、i/o操作阻塞以及缺乏并发处理机制所导致的。本教程将针对这些问题,提供一套系统性的优化方案。
原始代码中大量使用全局变量来管理程序状态(如center, radius, is_dragging_center, brightness_history等),这使得代码难以理解、维护和扩展。在复杂的实时系统中,清晰的结构和良好的状态管理至关重要。
问题分析:
优化方案:类封装与模块化 将相关的数据和操作封装到一个或多个类中,可以有效解决上述问题。例如,可以创建一个ImageProcessor类来管理图像处理逻辑、ROI选择状态和历史数据。
示例代码结构(概念性重构):
import os
import cv2
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtWidgets
from collections import deque
from threading import Thread, Event, Lock
from concurrent.futures import ThreadPoolExecutor
class ImageProcessor(QtCore.QObject): # 继承QObject以便使用信号槽
# 定义信号,用于在处理线程中更新GUI
data_processed_signal = QtCore.pyqtSignal(list, list)
def __init__(self, image_folder_path, img_batch_size=5):
super().__init__()
self.image_folder_path = image_folder_path
self.img_batch_size = img_batch_size
self.center = (0, 0)
self.radius = 0
self.is_dragging_center = False
self.is_dragging_radius = False
self.resized_image = None # 用于ROI选择的显示图像
self.original_roi_center = (0,0) # 原始图像尺寸下的ROI
self.original_roi_radius = 0
self.brightness_history = []
self.std_history = []
self.img_round_brightness_sum = 0
self.img_round_var_sum = 0
self.processed_img_count = 0
self.image_queue = deque() # 用于存储待处理的图像路径
self.shutdown_event = Event() # 用于控制线程关闭
self.processing_lock = Lock() # 用于保护共享资源(如打印输出或历史数据更新)
# 线程池用于并行处理单个图像的I/O和计算
self.executor = ThreadPoolExecutor(max_workers=os.cpu_count() or 4)
def select_roi(self, first_image_path):
"""
处理ROI选择逻辑,此部分通常在主线程进行。
"""
image = cv2.imread(first_image_path)
if image is None:
print(f"Error: Could not read image {first_image_path}")
return False
scale_percent = 60
width = int(image.shape[1] * scale_percent / 100)
height = int(image.shape[0] * scale_percent / 100)
dim = (width, height)
gray_img = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
colored_image = cv2.applyColorMap(gray_img, cv2.COLORMAP_PINK)
self.resized_image = cv2.resize(colored_image, dim, interpolation=cv2.INTER_AREA)
# 初始化ROI位置
self.center = (self.resized_image.shape[1] // 2, self.resized_image.shape[0] // 2)
self.radius = min(self.resized_image.shape[1] // 3, self.resized_image.shape[0] // 3)
cv2.namedWindow("Adjust the circle (press 'Enter' to proceed)")
cv2.setMouseCallback("Adjust the circle (press 'Enter' to proceed)", self._on_mouse)
while True:
display_image = self.resized_image.copy()
cv2.circle(display_image, self.center, self.radius, (0, 255, 0), 2)
cv2.circle(display_image, self.center, 5, (0, 0, 255), thickness=cv2.FILLED)
cv2.imshow("Adjust the circle (press 'Enter' to proceed)", display_image)
key = cv2.waitKey(1) & 0xFF
if key == 13:
break
cv2.destroyAllWindows()
# 将ROI坐标转换回原始图像尺寸
self.original_roi_center = (int(self.center[0] / scale_percent * 100),
int(self.center[1] / scale_percent * 100))
self.original_roi_radius = int(self.radius / scale_percent * 100)
return True
def _on_mouse(self, event, x, y, flags, param):
"""鼠标回调函数,更新ROI中心和半径"""
cx, cy = self.center
if event == cv2.EVENT_LBUTTONDOWN:
if np.sqrt((x - cx) ** 2 + (y - cy) ** 2) < 20: # 检查是否点击中心
self.is_dragging_center = True
else:
self.is_dragging_radius = True
elif event == cv2.EVENT_LBUTTONUP:
self.is_dragging_center = False
self.is_dragging_radius = False
elif event == cv2.EVENT_MOUSEMOVE:
if self.is_dragging_center:
self.center = (x, y)
elif self.is_dragging_radius:
self.radius = int(np.sqrt((x - self.center[0]) ** 2 + (y - self.center[1]) ** 2))
def calc_xray_count(self, image_path):
"""计算单张图像的平均亮度"""
original_image = cv2.imread(image_path, cv2.IMREAD_ANYDEPTH)
if original_image is None:
print(f"Warning: Could not read image {image_path}, skipping.")
return 0, 0 # 返回默认值
median_filtered_image = cv2.medianBlur(original_image, 5)
mask = np.zeros(original_image.shape, dtype=np.uint8)
cv2.circle(mask, self.original_roi_center, self.original_roi_radius, 255, thickness=cv2.FILLED)
# 避免黑色像素不被计数
result = cv2.bitwise_and(median_filtered_image + 1, median_filtered_image + 1, mask=mask)
pixel_count = np.count_nonzero(result)
img_brightness_sum = np.sum(result)
img_var = np.var(result)
if pixel_count > 0:
img_avg_brightness = (img_brightness_sum / pixel_count) - 1 # 减去之前加的1
else:
img_avg_brightness = 0
return img_avg_brightness, img_var
def _process_single_image(self, image_path):
"""由线程池调用的单个图像处理函数"""
return self.calc_xray_count(image_path)
def _producer_thread_func(self):
"""生产者线程:监控文件夹并添加新图像到队列"""
processed_files = set() # 记录已处理的文件,避免重复处理
while not self.shutdown_event.is_set():
current_files = set(f for f in os.listdir(self.image_folder_path) if f.endswith('.TIF'))
new_files = [os.path.join(self.image_folder_path, f) for f in (current_files - processed_files)]
for new_file in sorted(new_files): # 按名称排序,确保处理顺序
if not self.shutdown_event.is_set():
self.image_queue.append(new_file)
processed_files.add(os.path.basename(new_file))
else:
break
QtCore.QThread.msleep(50) # 短暂等待,避免CPU空转
print("Producer thread shutting down.")
def _consumer_thread_func(self):
"""消费者线程:从队列中取出图像并处理"""
while not self.shutdown_event.is_set() or len(self.image_queue) > 0:
if len(self.image_queue) == 0:
QtCore.QThread.msleep(10) # 队列为空时等待
continue
image_path = self.image_queue.popleft()
future = self.executor.submit(self._process_single_image, image_path)
img_avg_brightness, img_var = future.result() # 阻塞直到当前图像处理完成
with self.processing_lock: # 保护共享变量
self.img_round_brightness_sum += img_avg_brightness
self.img_round_var_sum += img_var
self.processed_img_count += 1
if self.processed_img_count % self.img_batch_size == 0:
avg_brightness_per_img_round = (self.img_round_brightness_sum / self.img_batch_size)
deviation_per_img_round = np.sqrt(self.img_round_var_sum / self.img_batch_size)
self.brightness_history.append(avg_brightness_per_img_round)
self.std_history.append(deviation_per_img_round)
# 发射信号,通知GUI更新
self.data_processed_signal.emit(list(enumerate(self.brightness_history, start=1)), self.brightness_history)
# 重置批次数据
self.img_round_brightness_sum = 0
self.img_round_var_sum = 0
print("Consumer thread shutting down.")
def start_processing(self):
"""启动生产者和消费者线程"""
self.producer_thread = Thread(target=self._producer_thread_func)
self.consumer_thread = Thread(target=self._consumer_thread_func)
self.producer_thread.start()
self.consumer_thread.start()
def stop_processing(self):
"""停止处理线程"""
self.shutdown_event.set()
self.producer_thread.join()
self.consumer_thread.join()
self.executor.shutdown(wait=True)
print("All processing threads stopped.")
原始代码在process_images函数中通过os.listdir(path)获取文件列表,然后循环处理。这种方式存在两个主要问题:
优化方案:并发编程 采用并发编程是解决实时数据处理性能瓶颈的关键。
concurrent.futures.ThreadPoolExecutor允许我们将任务提交给一个线程池,从而实现并行执行。这对于将I/O操作(如读取图像)与CPU密集型操作(如图像处理)分离非常有效。
适用场景: 当处理单个图像的任务可以并行执行,且任务之间没有严格的顺序依赖时。
示例(在ImageProcessor类中已体现): 在_consumer_thread_func中,通过self.executor.submit(self._process_single_image, image_path)将单个图像的处理提交给线程池。future.result()会等待该任务完成并返回结果。
当数据采集速率(生产者)高于处理速率(消费者)时,或者需要确保所有数据都被处理时,生产者-消费者模式是理想选择。一个线程(生产者)负责采集数据并放入队列,另一个或多个线程(消费者)从队列中取出数据进行处理。
适用场景:
示例(在ImageProcessor类中已体现):
这种模式解决了原始代码中os.listdir的“假值”问题,因为生产者会持续监控并添加新文件,而消费者只处理队列中已准备好的文件。processed_files集合确保了文件不会被重复处理。
注意事项:
PyQtGraph(或任何Qt应用)的GUI操作必须在主线程中进行。如果图像处理逻辑在单独的线程中运行,直接从这些线程更新GUI会导致崩溃或不可预测的行为。
问题分析: 原始代码在process_images中直接调用update_scatter(),并在其中执行QtCore.QCoreApplication.processEvents()和QtCore.QThread.msleep(100)。这种做法是错误的,它试图在处理线程中强制处理GUI事件,并且msleep会阻塞整个处理流程。
优化方案:信号与槽(Signals & Slots) Qt的信号与槽机制是实现线程间通信的推荐方式。工作线程在完成任务或有数据需要更新GUI时,发射一个信号,主线程的槽函数接收到信号后进行GUI更新。
示例(在ImageProcessor类中已体现):
# GUI初始化部分
app = QtWidgets.QApplication([])
pw = pg.PlotWidget(title='Mean Brightness vs image round')
# ... 其他GUI设置 ...
# 实例化处理器
image_folder_path = r'C:\Users\blehe\Desktop\Betatron\images'
processor = ImageProcessor(image_folder_path)
# 连接信号到GUI更新函数
def update_plot(x_data, y_data):
# 确保x_data和y_data是列表或可迭代的
if x_data and y_data:
line.setData(x=x_data, y=y_data)
scatter.setData(x=x_data, y=y_data, symbol='o', size=10, pen=pg.mkPen(None), brush=pg.mkBrush(255, 0, 0, 120))
# 清除旧标签并添加新标签(根据需要优化,大量标签会影响性能)
for item in pw.items():
if isinstance(item, pg.TextItem):
pw.removeItem(item)
for i, (xi, yi) in enumerate(zip(x_data, y_data)):
label = pg.TextItem(text=f'{yi:.2f}', anchor=(0, 0))
label.setPos(xi, yi)
pw.addItem(label)
processor.data_以上就是优化实时图像采集与处理系统的性能的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号