
本教程详细阐述了如何在Streamlit应用中有效集成Kedro数据管道,实现动态数据加载与处理。核心在于通过KedroSession.run()方法的data_catalog参数传递自定义的DataCatalog,以管理Streamlit中加载的DataFrame数据。文章还深入分析了常见的集成误区,如直接修改KedroContext属性,并提供了正确的代码示例和最佳实践,确保数据流的顺畅与高效。
在现代数据应用开发中,数据管道的自动化与交互式界面的结合变得日益重要。Kedro作为一个生产级的数据管道框架,能够帮助我们构建可维护、可测试和可重用的数据处理逻辑。而Streamlit则以其简洁的API,使得Python开发者能够快速构建美观的数据应用。将Kedro管道集成到Streamlit应用中,可以实现用户通过Web界面上传数据,并实时触发复杂的Kedro数据处理流程,从而构建功能强大且用户友好的数据产品。
本教程的目标是指导您如何在Streamlit应用中运行特定的Kedro管道,并向该管道传递在Streamlit中动态加载的数据,通过自定义的DataCatalog进行管理。
在深入集成之前,理解Kedro的两个核心概念至关重要:
在Streamlit中运行Kedro管道并传递动态数据,最核心且正确的方法是利用KedroSession.run()方法的data_catalog参数。这个参数允许您在运行时提供一个临时的、自定义的DataCatalog,它将覆盖或扩展Kedro项目默认的catalog.yml中定义的同名数据集。
以下是一个在Streamlit中集成Kedro管道的完整示例,展示了如何动态加载数据并传递给Kedro:
import streamlit as st
import pandas as pd
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
import os
# 假设您的Kedro项目位于当前工作目录下的 'my_kedro_project'
# 请根据实际情况调整 project_path
project_path = os.path.join(os.getcwd(), 'my_kedro_project')
st.title("Kedro与Streamlit数据处理应用")
st.header("上传您的数据")
# 模拟Streamlit文件上传和DataFrame创建
# 在实际应用中,这里会是 st.file_uploader 和 pd.read_csv/excel 等
uploaded_file1 = st.file_uploader("上传 reagentes_raw.csv", type=['csv'])
uploaded_file2 = st.file_uploader("上传 balanco_de_massas_raw.csv", type=['csv'])
# ... 更多文件上传器
df1, df2, df3, df4, df5, df6 = None, None, None, None, None, None
if uploaded_file1:
df1 = pd.read_csv(uploaded_file1)
st.write("reagentes_raw 数据预览:")
st.dataframe(df1.head())
if uploaded_file2:
df2 = pd.read_csv(uploaded_file2)
st.write("balanco_de_massas_raw 数据预览:")
st.dataframe(df2.head())
# ... 处理其他上传文件
# 确保所有必需的DataFrame都已加载 (这里仅为演示,实际应根据管道输入进行检查)
if st.button('处理输入数据') and df1 is not None and df2 is not None: # 简化检查
st.info('正在执行Kedro管道...')
# 模拟其他DataFrame,实际应通过上传获取
if df3 is None: df3 = pd.DataFrame({'colA': [1,2], 'colB': ['x','y']})
if df4 is None: df4 = pd.DataFrame({'colC': [3,4], 'colD': ['a','b']})
if df5 is None: df5 = pd.DataFrame({'colE': [5,6], 'colF': ['m','n']})
if df6 is None: df6 = pd.DataFrame({'colG': [7,8], 'colH': ['p','q']})
try:
# 创建自定义DataCatalog,包含MemoryDataSet
custom_catalog = DataCatalog({
"reagentes_raw": MemoryDataSet(df1),
"balanco_de_massas_raw": MemoryDataSet(df2),
"laboratorio_raw": MemoryDataSet(df3), # 示例数据
"laboratorio_raiox_raw": MemoryDataSet(df4), # 示例数据
"carta_controle_pims_raw": MemoryDataSet(df5), # 示例数据
"blend_raw": MemoryDataSet(df6) # 示例数据
})
# 创建KedroSession并运行管道
with KedroSession.create(project_path=project_path) as session:
# 关键:通过 data_catalog 参数传递自定义目录
session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")
st.success('数据处理成功!')
# 从自定义的catalog中加载管道的输出数据
# 假设管道的输出数据集名为 "merged_raw_data_process"
merged_data = custom_catalog.load("merged_raw_data_process")
st.header('处理结果预览')
st.dataframe(merged_data.head())
# 示例:显示最后更新时间,假设输出数据包含 'timestamp' 列
if 'timestamp' in merged_data.columns:
last_update = pd.to_datetime(merged_data['timestamp']).max()
st.write(f"数据集中最新信息的时间: {last_update.strftime('%Y/%m/%d %H:%M:%S')}")
else:
st.write("输出数据中未找到 'timestamp' 列。")
except Exception as e:
st.error(f"Kedro管道执行失败: {e}")
st.exception(e)
注意:
当Kedro管道通过session.run(data_catalog=custom_catalog, ...)执行完毕后,管道的输出数据集(如果它们被定义为写入catalog)将存储在您传入的custom_catalog对象中。这意味着,您可以直接从该custom_catalog实例中加载管道处理后的结果,并在Streamlit应用中进行展示或进一步处理。
如上例所示:
merged_data = custom_catalog.load("merged_raw_data_process")
st.dataframe(merged_data.head())这行代码从之前传入的custom_catalog中加载了名为merged_raw_data_process的数据集,该数据集是Kedro管道的最终输出。
在集成Kedro与Streamlit时,开发者可能会遇到一些常见的AttributeError。这些错误通常源于尝试以不正确的方式修改Kedro的内部状态。
问题描述: 尝试直接对KedroSession或KedroContext的catalog属性进行赋值操作,例如 context.catalog = custom_catalog。
错误原因: KedroSession.catalog和KedroContext.catalog属性在Kedro的设计中是只读的。它们在会话或上下文创建时被初始化,并且不应该在运行时被直接外部修改。Kedro通过配置(catalog.yml)和session.run()方法的参数来管理数据目录的生命周期和内容。
解决方案: 绝对不要尝试直接设置context.catalog。正确的做法是,在调用session.run()时,通过data_catalog参数传递您自定义的DataCatalog。如前文示例所示:
with KedroSession.create(project_path=project_path) as session:
session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")这种方式是Kedro官方推荐且唯一支持的在运行时注入自定义数据目录的方法。
问题描述: 尝试从KedroContext对象中访问一个名为pipeline_registry的属性,例如 context.pipeline_registry.get("tag_web_app")。
错误原因: KedroContext对象本身不直接暴露pipeline_registry属性。管道的注册和管理是Kedro内部框架的一部分,通常通过KedroSession的run()方法或context.pipelines属性来间接访问和执行。尝试直接访问pipeline_registry是错误的API使用方式。
解决方案: 避免直接操作pipeline_registry。如果您需要运行特定的管道,只需在session.run()方法中通过pipeline_name参数指定即可:
with KedroSession.create(project_path=project_path) as session:
session.run(pipeline_name="tag_web_app", data_catalog=custom_catalog)如果您确实需要获取管道对象(例如用于更高级的调试或自定义运行),可以通过context.pipelines字典来访问,例如 context.pipelines["tag_web_app"],但通常情况下,直接使用session.run()更为简洁和推荐。
将Kedro的强大数据管道能力与Streamlit的便捷交互界面相结合,能够为数据科学家和工程师提供一个高效且灵活的开发环境。本教程强调了在Streamlit应用中通过KedroSession.run()方法的data_catalog参数传递自定义DataCatalog的正确方法,这是处理动态数据的核心。同时,通过深入解析常见的AttributeError,我们明确了Kedro的API设计原则,即避免直接修改只读属性或访问不存在的内部组件。遵循这些指导原则和最佳实践,您将能够构建稳定、高效且易于维护的Kedro-Streamlit集成应用。
以上就是Kedro与Streamlit集成:构建动态数据管道的实践指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号