Kedro与Streamlit集成:动态数据目录下的管道运行实践

花韻仙語
发布: 2025-11-14 12:51:06
原创
999人浏览过

Kedro与Streamlit集成:动态数据目录下的管道运行实践

本文旨在指导开发者如何在streamlit应用中集成并运行kedro数据管道,重点解决如何动态创建并传递自定义`datacatalog`以处理streamlit加载的数据。文章将阐明常见的错误尝试及其原因,并提供一种健壮的方法,通过`kedrosession.run()`的`data_catalog`参数正确地将运行时数据注入kedro管道,从而实现数据处理的无缝衔接。

在构建交互式数据应用时,将强大的数据管道框架(如Kedro)与灵活的Web应用框架(如Streamlit)结合是一种常见的需求。特别是当数据源是动态的,例如用户通过Streamlit界面上传文件时,我们需要一种机制来将这些运行时加载的数据作为输入,传递给Kedro管道进行处理。本文将详细介绍如何实现这一目标,并纠正在此过程中可能遇到的常见误区。

理解Kedro与Streamlit集成中的数据流

Kedro的核心概念之一是DataCatalog,它定义了数据加载和保存的方式。通常,DataCatalog在conf/base/catalog.yml中静态定义。然而,在Streamlit应用中,数据通常在运行时由用户上传,这意味着我们需要一个动态的DataCatalog来封装这些内存中的DataFrame。

目标是将Streamlit中加载的DataFrame包装成MemoryDataSet,然后构建一个临时的DataCatalog,并将其传递给Kedro管道执行。

常见错误尝试与原因分析

在尝试将自定义DataCatalog传递给Kedro管道时,开发者可能会遇到一些AttributeError。理解这些错误的原因对于正确实现集成至关重要。

错误尝试一:直接修改KedroContext或KedroSession的catalog属性

# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
import pandas as pd
import streamlit as st

# 假设 df1, df2, ... 是在Streamlit中加载的DataFrame
df1 = pd.DataFrame({'col1': [1, 2]})
df2 = pd.DataFrame({'col2': [3, 4]})

if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path="./my_kedro_project") as session:
        context = session.load_context()
        # 尝试直接设置context.catalog,这将导致AttributeError
        # context.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'

        # 尝试直接设置session.catalog,同样会导致AttributeError
        # session.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'

        # ...后续管道运行代码
登录后复制

原因分析:AttributeError: can't set attribute 'catalog'

KedroSession和KedroContext的catalog属性在Kedro内部被设计为只读。这意味着您不能在会话或上下文创建之后,通过直接赋值的方式来修改它们引用的DataCatalog对象。DataCatalog在KedroSession初始化时被加载并冻结,以确保管道执行的一致性和可预测性。尝试直接修改它会违反这一设计原则,从而引发AttributeError。

错误尝试二:通过KedroContext访问pipeline_registry

# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.runner import SequentialRunner
import streamlit as st

# ... (数据加载和catalog创建) ...

if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path="./my_kedro_project") as session:
        context = session.load_context()
        runner = SequentialRunner()
        # 尝试通过context.pipeline_registry获取管道,这将导致AttributeError
        # runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=custom_catalog) # AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
登录后复制

原因分析:AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

KedroContext对象并不直接拥有pipeline_registry属性。管道注册是KedroSession负责管理的一部分。当您通过KedroSession.run()方法执行管道时,会话会自动处理管道的查找和注册。直接从KedroContext中访问pipeline_registry是不符合Kedro设计模式的。

集简云
集简云

软件集成平台,快速建立企业自动化与智能化

集简云 22
查看详情 集简云

正确的方法:通过KedroSession.run()传递自定义DataCatalog

Kedro提供了一种简洁且推荐的方式来在运行时注入自定义DataCatalog,即使用KedroSession.run()方法的data_catalog(或旧版本中的catalog)参数。

步骤一:在Streamlit中加载数据并创建MemoryDataSet

首先,在Streamlit应用中,您需要使用文件上传器或其他方式加载数据,并将其转换为Pandas DataFrame。然后,将这些DataFrame包装成Kedro的MemoryDataSet对象。MemoryDataSet是Kedro提供的一种数据集类型,用于处理内存中的数据,非常适合这种动态场景。

import streamlit as st
import pandas as pd
from kedro.io import DataCatalog, MemoryDataSet
from kedro.framework.session import KedroSession
from pathlib import Path

# 假设你的Kedro项目路径
project_path = Path(__file__).parent / "my_kedro_project" # 根据实际项目结构调整

st.title("Kedro与Streamlit数据处理应用")

uploaded_file1 = st.file_uploader("上传 Reagentes CSV", type=["csv"])
uploaded_file2 = st.file_uploader("上传 Balanço de Massas CSV", type=["csv"])
# ... 可以根据需要添加更多文件上传器

df1, df2, df3, df4, df5, df6 = None, None, None, None, None, None

if uploaded_file1:
    df1 = pd.read_csv(uploaded_file1)
    st.write("Reagentes 数据加载成功:")
    st.dataframe(df1.head())

if uploaded_file2:
    df2 = pd.read_csv(uploaded_file2)
    st.write("Balanço de Massas 数据加载成功:")
    st.dataframe(df2.head())

# 假设还有其他文件加载,这里简化
# df3 = pd.DataFrame(...)
# df4 = pd.DataFrame(...)
# df5 = pd.DataFrame(...)
# df6 = pd.DataFrame(...)
登录后复制

步骤二:构建自定义DataCatalog

当所有必需的DataFrame都加载完毕后,您可以创建一个新的DataCatalog实例,并将这些MemoryDataSet对象作为键值对添加到其中。键名应与您的Kedro管道中期望的数据集名称相匹配。

# ... (承接上一步的代码) ...

if st.button('Processar Dados de Entrada'):
    if df1 is not None and df2 is not None: # 确保所有必要数据都已加载
        # 创建自定义DataCatalog
        custom_catalog = DataCatalog({
            "reagentes_raw": MemoryDataSet(df1),
            "balanco_de_massas_raw": MemoryDataSet(df2),
            # 根据需要添加更多数据集
            # "laboratorio_raw": MemoryDataSet(df3),
            # "laboratorio_raiox_raw": MemoryDataSet(df4),
            # "carta_controle_pims_raw": MemoryDataSet(df5),
            # "blend_raw": MemoryDataSet(df6)
        })

        st.info("正在执行Kedro管道...")

        try:
            # 步骤三:通过KedroSession.run()传递自定义DataCatalog
            with KedroSession.create(project_path=project_path) as session:
                session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

            st.success('数据处理成功!')

            # 步骤四:从自定义DataCatalog中加载处理后的结果
            # 假设管道输出一个名为 "merged_raw_data_process" 的数据集
            if "merged_raw_data_process" in custom_catalog.list():
                merged_data = custom_catalog.load("merged_raw_data_process")
                st.header('结果数据预览')
                st.dataframe(merged_data.head())

                # 假设结果数据中有一个时间戳列
                if 'timestamp_column' in merged_data.columns: # 请替换为实际的时间戳列名
                    last_update = merged_data['timestamp_column'].max()
                    st.write(f"最新数据时间: {last_update.strftime('%d/%m/%Y %H:%M')}")
            else:
                st.warning("管道未生成预期的 'merged_raw_data_process' 数据集。")

        except Exception as e:
            st.error(f"Kedro管道执行失败: {e}")
    else:
        st.warning("请上传所有必要的数据文件。")
登录后复制

步骤三:通过KedroSession.run()传递自定义DataCatalog

这是解决问题的关键步骤。KedroSession.run()方法接受一个data_catalog(或旧版本中的catalog)参数,允许您传入一个临时的DataCatalog实例。这个传入的DataCatalog会与项目默认的DataCatalog合并,或者在某些情况下完全覆盖默认的同名数据集定义,从而将您的内存数据注入到管道执行中。

# ... (代码片段已包含在步骤二中) ...
with KedroSession.create(project_path=project_path) as session:
    session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")
登录后复制

请注意,pipeline_name参数用于指定要运行的特定管道。如果您的Kedro项目只有一个默认管道,可以省略此参数。

步骤四:从自定义DataCatalog中加载处理后的结果

管道执行完成后,如果您的管道配置为将结果保存到MemoryDataSet中(例如,通过在catalog.yml中将输出数据集定义为MemoryDataSet,或者在运行时通过custom_catalog覆盖),您可以直接从传入的custom_catalog中加载这些结果。

# ... (代码片段已包含在步骤二中) ...
merged_data = custom_catalog.load("merged_raw_data_process")
登录后复制

注意事项与最佳实践

  1. Kedro项目结构: 确保您的Streamlit应用能够正确找到Kedro项目的根目录(project_path)。通常,Streamlit应用可以放在Kedro项目之外,但需要正确指定project_path。
  2. MemoryDataSet的使用: MemoryDataSet非常适合处理临时数据。如果需要将处理结果持久化,您可以在DataCatalog中定义其他类型的数据集(如ParquetDataSet、CSVDataSet等),或者在管道的末尾手动保存DataFrame。
  3. 管道定义: 确保您的Kedro管道中的节点能够接收和处理MemoryDataSet提供的数据。输入数据集的名称必须与custom_catalog中定义的键名匹配。
  4. 错误处理: 在实际应用中,务必添加适当的错误处理机制,以捕获Kedro管道执行过程中可能出现的异常,并向用户提供友好的反馈。
  5. 性能考虑: 对于大型数据集,MemoryDataSet可能会消耗大量内存。根据您的数据规模和性能需求,可能需要考虑更高效的数据处理策略,例如分块处理或使用更适合大数据的Kedro数据集类型。
  6. Kedro版本兼容性: 本文中的data_catalog参数适用于较新版本的Kedro。如果您使用的是旧版本,可能需要使用catalog参数。请查阅您的Kedro版本文档以确认正确的参数名称。

总结

通过在KedroSession.run()方法中利用data_catalog参数,我们可以优雅地将Streamlit中加载的动态数据注入到Kedro管道中进行处理。这种方法避免了直接修改Kedro内部只读属性的错误,提供了一种符合Kedro设计哲学且易于维护的集成方案。遵循本文介绍的步骤和最佳实践,您将能够构建出功能强大、交互性强的数据处理应用。

以上就是Kedro与Streamlit集成:动态数据目录下的管道运行实践的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号