
在开发基于python shiny的交互式应用时,我们经常会遇到需要在响应式函数中执行一些耗时操作的场景,例如通过串口发送一系列指令来控制外部设备(如泵)。当这些操作包含一个长时间运行的循环时,如果处理不当,会发现整个应用程序的用户界面(ui)会变得无响应。这意味着用户点击其他按钮或进行其他交互时,应用无法立即响应,而是等待当前耗时操作完成后才处理后续事件。
考虑一个控制流体泵的Shiny应用示例:用户点击“启动泵”按钮(input.p1)后,应用会通过串口发送一系列电压指令,每隔2秒发送一次,持续一段时间。同时,用户希望能够随时点击“停止泵”按钮(input.p2)来立即中断当前的发送过程。
最初的实现可能如下所示:
import time
import serial
from shiny import reactive, App, ui
# 假设ser是已初始化的串口对象
# ser = serial.Serial("COM6", 115200)
@reactive.Effect
@reactive.event(input.p1)
def start_pump():
y = yg.get() # 从响应式值获取电压数组
for e in y: # 遍历数组发送指令
msg = "1:1:"+str(e)+":100"
ser.write(bytes(msg,'utf-8'))
t0 = time.time()
while(((time.time()-t0)<=2)): # 阻塞等待2秒
pass
ser.write(bytes("0:1",'utf-8')) # 传输结束后停止泵
@reactive.Effect
@reactive.event(input.p2)
def stop_pump():
ser.write(bytes("0:1",'utf-8')) # 停止泵在这种实现中,当input.p1触发start_pump函数时,其中的for循环和内部的while循环会完全阻塞Shiny应用的UI线程。这意味着在start_pump函数执行期间,即使点击input.p2,stop_pump函数也不会立即执行,而是会被Shiny的事件队列排队,直到start_pump函数完全执行完毕。这导致用户无法即时中断泵的运行,从而严重影响用户体验和应用的实时控制能力。
为了解决UI阻塞和响应性丢失的问题,核心思想是将耗时的操作从Shiny应用的UI线程中分离出来,放到一个独立的后台线程中执行。这样,UI线程可以继续处理用户输入,而后台线程则负责执行耗时任务。同时,我们需要一种机制让UI线程能够与后台线程通信,以便在需要时(例如用户点击“停止泵”按钮)能够安全地中断后台线程的执行。
立即学习“Python免费学习笔记(深入)”;
Python的threading模块和threading.Event类是实现这一目标的理想工具。
首先,我们需要将原先阻塞UI的循环逻辑封装到一个独立的函数中,这个函数将作为新线程的执行目标。该函数需要接收一个threading.Event对象作为参数,以便能够检查停止信号。
import threading as th
import numpy as np
import time
import serial
# 假设ser是已初始化的串口对象
# ser = serial.Serial("COM6", 115200)
# 辅助函数:发送单个指令
def transmit(e, ser_port):
"""根据给定电压值e,格式化消息并发送到串口"""
msg = "1:1:" + str(e) + ":100"
ser_port.write(bytes(msg, 'utf-8'))
# 线程目标函数:执行耗时循环
def rtimer(y_values, sflag_event, ser_port):
"""
在独立线程中执行泵的传输循环。
y_values: 驱动电压数组。
sflag_event: threading.Event对象,用于接收停止信号。
ser_port: 串口对象。
"""
i = 0
# 循环条件:未遍历完数组且未收到停止信号
while i < np.size(y_values) and not sflag_event.is_set():
transmit(y_values[i], ser_port)
i += 1
time.sleep(2) # 模拟耗时操作,此处为2秒间隔
# 循环结束后,如果不是因为停止信号中断,则发送停止指令
if not sflag_event.is_set():
ser_port.write(bytes("0:1", 'utf-8')) # 正常结束时停止泵在rtimer函数中,关键在于while i < np.size(y_values) and not sflag_event.is_set():这一行。它确保了循环不仅在遍历完所有数据后结束,也能在sflag_event被设置(即收到停止信号)时立即中断。time.sleep(2)用于模拟每次发送之间的2秒间隔,这是一个阻塞操作,但因为它在一个单独的线程中,所以不会阻塞UI。
接下来,我们需要修改Shiny应用中的响应式函数,以便它们能够启动和停止这个后台线程。
from shiny import App, render, ui, reactive
# ... 其他导入和初始化,如ser串口对象 ...
def server(input, output, session):
# 初始化一个threading.Event对象,用于线程间通信
sflag = th.Event()
# 假设yg是存储电压数组的reactive.Value
yg = reactive.Value(np.array([50, 60, 70, 80, 90, 100])) # 示例数据
# ... transmit 和 rtimer 函数定义放在这里或外部 ...
@reactive.Effect()
@reactive.event(input.p1)
def start_pump_threaded():
"""
处理“启动泵”按钮点击事件。
清除停止信号,创建并启动新线程。
"""
y = yg.get()
sflag.clear() # 确保停止信号是清除状态
# 创建一个新线程,目标是rtimer函数,并传递参数
timer_thread = th.Thread(target=rtimer, args=[y, sflag, ser])
timer_thread.start() # 启动线程
@reactive.Effect()
@reactive.event(input.p2)
def stop_pump_threaded():
"""
处理“停止泵”按钮点击事件。
设置停止信号,并立即发送停止指令到串口。
"""
sflag.set() # 设置停止信号,通知后台线程停止
ser.write(bytes("1:0", 'utf-8')) # 立即发送停止指令到串口在start_pump_threaded函数中:
在stop_pump_threaded函数中:
为了更全面地展示,以下是包含上述核心逻辑的Shiny应用服务器端代码片段:
from shiny import App, render, ui, reactive
import serial
import time
import numpy as np
import threading as th
# 假设串口已正确初始化
# 注意:在实际应用中,串口对象的管理可能需要更复杂的策略,
# 例如确保在应用关闭时正确关闭串口。
try:
ser = serial.Serial("COM6", 115200)
except serial.SerialException as e:
print(f"Error opening serial port: {e}. Please check port availability.")
# 在实际应用中,这里可能需要更优雅的错误处理,例如禁用相关UI元素
ser = None # 确保ser在无法打开时为None,防止后续操作报错
# 辅助函数:发送单个指令
def transmit(e, ser_port):
if ser_port and ser_port.is_open:
msg = "1:1:" + str(e) + ":100"
print(f"Sending: {msg}") # 用于调试
ser_port.write(bytes(msg, 'utf-8'))
else:
print("Serial port not open, cannot transmit.")
# 线程目标函数:执行耗时循环
def rtimer(y_values, sflag_event, ser_port):
print("Pump transmission thread started.")
i = 0
while i < np.size(y_values) and not sflag_event.is_set():
transmit(y_values[i], ser_port)
i += 1
time.sleep(2) # 模拟2秒间隔
# 循环结束后,根据中断原因进行处理
if sflag_event.is_set():
print("Pump transmission interrupted by stop signal.")
else:
print("Pump transmission completed normally.")
if ser_port and ser_port.is_open:
ser_port.write(bytes("0:1", 'utf-8')) # 正常结束时停止泵
def server(input, output, session):
# 用于存储用户配置的电压数据
yg = reactive.Value(np.array([]))
# 初始化一个threading.Event对象,用于线程间通信
sflag = th.Event()
# 示例UI元素,用于生成yg数据
@reactive.Effect
@reactive.event(input.AK, input.TK) # 假设这些输入控制生成yg
def update_yg_example():
# 这是一个简化示例,实际yg的生成逻辑应根据你的应用来
if input.AK() is not None and input.TK() is not None:
x = np.arange(0, input.TK() + 2, 2)
y = np.ones(np.size(x)) * input.AK()
yg.set(np.rint(y).astype(int))
print(f"yg updated: {yg.get()}")
@reactive.Effect()
@reactive.event(input.p1)
def start_pump_handler():
"""处理“启动泵”按钮点击事件"""
if ser is None or not ser.is_open:
print("Serial port not available. Cannot start pump.")
return
y = yg.get()
if y.size == 0:
print("No pump profile data (yg) available to transmit.")
return
sflag.clear() # 清除之前的停止信号
# 创建并启动新线程
timer_thread = th.Thread(target=rtimer, args=[y, sflag, ser])
timer_thread.start()
print("Pump start command issued. Threading started.")
@reactive.Effect()
@reactive.event(input.p2)
def stop_pump_handler():
"""处理“停止泵”按钮点击事件"""
if ser is None or not ser.is_open:
print("Serial port not available. Cannot stop pump.")
return
sflag.set() # 设置停止信号,通知后台线程停止
ser.write(bytes("1:0", 'utf-8')) # 立即发送停止指令到串口
print("Pump stop command issued. Stop signal sent to thread.")
# 更多Shiny UI和服务器逻辑...
# 例如,你的UI定义:
# app_ui = ui.page_fluid(
# ui.input_numeric("AK", "Amplitude [V]", value=100),
# ui.input_numeric("TK", "Runtime [s]", value=10),
# ui.input_action_button("p1", "Pumpe Start"),
# ui.input_action_button("p2", "Pumpe Stopp")
# )
# app = App(app_ui, server)在Python Shiny应用中处理耗时操作并保持UI响应性是开发交互式应用的关键。通过将这些操作封装在独立的线程中,并利用threading.Event机制进行线程间通信,我们能够有效地避免UI阻塞,实现即时中断,从而显著提升用户体验。这种模式不仅适用于串口通信,也适用于其他任何可能长时间运行并阻塞主线程的任务。理解UI线程和工作线程之间的区别,并选择合适的并发工具,是构建高性能、响应式Shiny应用的基础。
以上就是Python Shiny:在响应式函数中处理耗时循环并保持应用响应性的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号