0

0

Dagster资产间数据传递与用户配置管理教程

碧海醫心

碧海醫心

发布时间:2025-11-29 13:05:00

|

500人浏览过

|

来源于php中文网

原创

Dagster资产间数据传递与用户配置管理教程

本教程旨在解决dagster中常见的资产间数据传递和用户自定义配置(config)使用问题。通过详细解析错误案例,展示如何正确地将上游资产的输出作为参数传递给下游资产,并有效利用config对象接收用户定义的运行时参数,从而构建健壮、可配置的dagster数据管道,避免`dagsterinvalidconfigerror`等配置与数据流错误。

在数据工程实践中,我们经常需要构建可配置的数据管道,允许用户在运行时输入参数,例如数据拉取的起始日期或特定的筛选条件。同时,数据管道中的各个步骤(在Dagster中通常表现为“资产”)之间需要高效、明确地传递数据。然而,在Dagster中,如果不正确地处理用户配置和资产间的数据流,可能会遇到诸如DagsterInvalidConfigError之类的错误。本教程将深入探讨如何正确地实现这些功能。

理解Dagster中的资产与配置

Dagster的核心理念之一是“软件定义资产”(Software-Defined Assets)。每个资产都代表数据系统中的一个逻辑实体,并且可以定义其如何被计算。资产之间的依赖关系和数据流是其关键特性。

用户自定义配置 (Config)

Dagster通过Config类提供了一种声明式的方式来定义资产在运行时所需的参数。这些参数可以在Dagster UI中由用户输入,或通过编程方式提供。

from dagster import Config

class FruitConfig(Config):
    fruit_select: str

上述代码定义了一个名为FruitConfig的配置对象,它包含一个字符串类型的参数fruit_select。当一个资产需要此配置时,它会在其函数签名中声明一个类型为FruitConfig的参数。

资产间的数据传递

在Dagster中,资产的输出是其下游资产的输入。这种传递不是通过在下游资产中“调用”上游资产函数来实现的,而是通过将上游资产的输出作为参数注入到下游资产的函数中。

常见错误模式与原因分析

考虑以下不正确的Dagster资产定义,它试图使用用户配置并传递数据:

# 错误示例:不正确的资产定义
import pandas as pd
# ... 其他导入 ...
from dagster import asset, Config

# ... generate_dataset 资产定义 (与正确示例相同,略) ...

class fruit_config(Config):
    fruit_select: str 

@asset(deps=[generate_dataset]) # deps在这里用于数据传递是错误的
def filter_data(config: fruit_config):
    # 错误:不应在此处调用上游资产来获取数据
    df = generate_dataset() 
    df2 = df[df['fruit'] == config.fruit_select]
    print(df2)
    return df2

@asset(deps=[filter_data]) # deps在这里用于数据传递是错误的
def filter_again():
    # 错误:不应在此处调用上游资产来获取数据
    df2 = filter_data() 
    df3 = df2[df2['units'] > 5]
    print(df3)
    return df3

上述代码存在以下主要问题:

  1. 错误的资产数据获取方式: 在filter_data资产中,通过直接调用generate_dataset()来获取上游数据是错误的。在Dagster的资产模型中,上游资产的输出会作为参数自动注入到下游资产中。直接调用会导致每次运行时都重新执行上游资产,并且无法正确建立数据流依赖。同样的问题也存在于filter_again资产中。
  2. deps参数的误用: @asset装饰器中的deps参数用于声明“非数据流”依赖,即一个资产的执行依赖于另一个资产的完成,但不需要其输出数据。如果需要传递数据,应通过函数参数显式声明。
  3. 潜在的配置解析问题: 当filter_data试图在内部调用generate_dataset()时,Dagster的运行时可能无法正确解析filter_data所需的配置,因为其输入签名与实际的数据流期望不符,从而引发DagsterInvalidConfigError。

正确实现:数据流与配置的结合

为了正确地实现用户配置和资产间的数据传递,我们需要遵循Dagster的推荐模式:

PHP房产程序[BBWPS]
PHP房产程序[BBWPS]

[PHP房产程序|BBWPS]功能介绍 1、5种信息类别发布:出租、求租、出售、求购、楼盘信息,支持会员发布信息审核; 2、灵活的信息参数设置; 3、充足的信息字段; 4、简单易用的发布/编辑功能,支持配图上传; 5、灵活的信息管理功能; 6、信息输出伪静态,方便搜索引擎抓取数据; 7、支持RSS输出; 8、内置数据高速缓冲技术,可灵活设置缓冲功能是否启动及过期时间; 9、支持 Google 地图

下载
  1. 将上游资产的输出作为参数传递给下游资产。
  2. 为资产函数的参数和返回值添加类型提示,增强可读性和运行时检查。
  3. 将Config对象作为参数传递给需要配置的资产。

以下是修正后的代码示例:

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, Config, materialize # materialize用于本地测试

# 1. 定义生成原始数据集的资产
@asset 
def generate_dataset() -> pd.DataFrame:
    """
    生成一个包含水果、单位和日期的随机数据集。
    """
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        return [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]

    random.seed(42) # 保证可复现性
    num_rows = 100
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']

    df = pd.DataFrame({
        'fruit': [random.choice(fruits) for _ in range(num_rows)],
        'units': [random.randint(1, 10) for _ in range(num_rows)],
        'date': random_dates(datetime(2022, 1, 1), datetime(2022, 12, 31), num_rows)
    })
    print("Generated Dataset:")
    print(df.head())
    return df

# 2. 定义用户配置类
class FruitConfig(Config):
    """
    用户自定义配置,用于选择要筛选的水果。
    """
    fruit_select: str 

# 3. 定义筛选数据的资产,接收上游资产输出和用户配置
@asset 
def filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
    """
    根据用户配置的fruit_select筛选数据集。

    Args:
        generate_dataset (pd.DataFrame): 上游资产generate_dataset的输出。
        config (FruitConfig): 用户提供的配置对象。

    Returns:
        pd.DataFrame: 筛选后的数据集。
    """
    # 直接使用传入的generate_dataset参数
    df_filtered = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
    print(f"\nFiltered Data for '{config.fruit_select}':")
    print(df_filtered.head())
    return df_filtered

# 4. 定义再次筛选的资产,接收上游资产输出
@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
    """
    对上游filter_data资产的输出进行二次筛选(单位大于5)。

    Args:
        filter_data (pd.DataFrame): 上游资产filter_data的输出。

    Returns:
        pd.DataFrame: 再次筛选后的数据集。
    """
    # 直接使用传入的filter_data参数
    df_final = filter_data[filter_data['units'] > 5]
    print("\nFinal Filtered Data (units > 5):")
    print(df_final.head())
    return df_final

# 示例:如何在本地运行包含配置的资产
if __name__ == "__main__":
    # 使用materialize函数在本地运行资产
    # 传递Config的方式是嵌套在'ops'字典中,对应资产名和'config'键
    result = materialize(
        [generate_dataset, filter_data, filter_again],
        run_config={
            "ops": {
                "filter_data": {
                    "config": {
                        "fruit_select": "Banana" # 用户在此处定义参数
                    }
                }
            }
        }
    )
    assert result.success
    print("\nPipeline executed successfully!")

代码解析与最佳实践

  1. 资产函数签名:

    • filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
      • generate_dataset: pd.DataFrame:明确表示filter_data资产依赖于名为generate_dataset的上游资产的输出,并且该输出的类型是pd.DataFrame。Dagster运行时会自动将generate_dataset资产的返回值注入到此参数中。
      • config: FruitConfig:声明此资产需要一个FruitConfig类型的配置对象。用户在Dagster UI或通过run_config提供的值将填充此对象。
      • -> pd.DataFrame:这是Python的类型提示,表明filter_data资产将返回一个pd.DataFrame对象。这对于Dagster理解资产的输出类型至关重要,也增强了代码的可读性。
    • filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
      • 同样地,filter_again资产接收filter_data资产的输出作为其输入参数。
  2. 移除deps参数:

    • 在正确实现中,@asset装饰器不再需要deps参数来表示数据流依赖。当一个资产函数的参数与另一个资产的名称匹配时,Dagster会自动识别并建立数据流依赖。
  3. 本地运行与配置:

    • 在if __name__ == "__main__":块中,展示了如何使用materialize函数在本地运行这些资产。
    • run_config字典用于提供运行时配置。对于资产级别的配置,它需要嵌套在"ops"键下,然后是资产名称,再是"config"键,最后是配置参数。例如,{"ops": {"filter_data": {"config": {"fruit_select": "Banana"}}}}。

总结

通过本教程,我们学习了在Dagster中构建可配置数据管道的关键原则:

  • 明确的资产输入/输出: 使用函数参数来接收上游资产的输出和用户配置。
  • 类型提示: 强烈建议为资产函数的参数和返回值添加类型提示,这不仅提高了代码的可读性,也帮助Dagster在运行时进行验证。
  • 正确使用Config: 将Config对象作为资产函数的参数,Dagster会自动处理配置的解析和注入。
  • 避免在下游资产中直接调用上游资产: 这种做法违背了Dagster的数据流模型,会导致错误和低效。

遵循这些最佳实践,可以有效地避免常见的配置和数据流错误,构建出更加健壮、可维护的Dagster数据管道。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

769

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

661

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

764

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

639

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1325

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

549

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

579

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

709

2023.08.11

Java编译相关教程合集
Java编译相关教程合集

本专题整合了Java编译相关教程,阅读专题下面的文章了解更多详细内容。

9

2026.01.21

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 10.6万人学习

Django 教程
Django 教程

共28课时 | 3.3万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号