Delta Live Tables:高效地为所有列应用数据质量期望

DDD
发布: 2025-12-02 11:31:01
原创
549人浏览过

delta live tables:高效地为所有列应用数据质量期望

本文探讨了如何在Delta Live Tables (DLT) 中高效地为表中的所有列动态应用数据质量期望,解决了传统方法中手动指定或低效循环的问题。通过利用`@dlt.expect_all_or_fail`装饰器,结合Python动态生成期望字典,用户可以轻松实现表级别的数据质量检查,确保数据完整性,并在不符合预期的行出现时及时中断管道,从而提升数据治理的自动化和可维护性。

引言:Delta Live Tables与数据质量期望

Delta Live Tables (DLT) 是Databricks提供的一个声明式框架,用于构建可靠、可维护且可测试的数据管道。其核心功能之一是内置的数据质量期望(Expectations),允许开发者在数据处理的不同阶段定义数据必须满足的条件。这些期望可以用于验证数据的完整性、一致性和准确性,并在数据不符合预期时采取相应的行动(如失败、丢弃或隔离)。

通常,我们在DLT中使用@dlt.expect或@dlt.expect_or_fail等装饰器为表的特定列定义数据质量规则,例如:

import dlt
from pyspark.sql.functions import expr

@dlt.table(
  comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
  return (
    dlt.read("clickstream_raw")
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_title", "click_count", "previous_page_title")
  )
登录后复制

然而,当需要对表中所有列应用相同的期望(例如,所有列都不能为NULL)时,手动为每一列编写@dlt.expect装饰器将变得繁琐且难以维护。

挑战:为所有列动态应用期望

在处理包含大量列的数据集时,如果需要对所有列应用相同的通用数据质量检查(例如,检查所有列是否为NULL),传统方法会面临以下问题:

  1. 手动重复: 为每列手动添加@dlt.expect装饰器,代码冗长且易出错。

  2. 低效循环: 尝试通过Python循环动态生成多个DLT表定义来模拟对每列的检查,如以下示例所示:

    # 这种方法非常低效且不推荐
    # for column in columns_list_order_table:     
    #     exec(f'''
    # @dlt.table(comment="null value validations for {column}")
    # @dlt.expect_or_drop("null values","is_null == false")
    # def null_validation_orders_for_column_{column}():
    #     df = dlt.read("bronze_orders")
    #     return df.withColumn("is_null", col("{column}").isNull())
    # ''')
    登录后复制

    这种exec动态生成函数和表的方式不仅效率低下,而且难以调试和管理,严重违背了DLT声明式管道的初衷。

解决方案:使用 @dlt.expect_all_or_fail 动态应用期望

DLT提供了一个强大的装饰器@dlt.expect_all_or_fail(expectations),专门用于解决需要同时应用多个数据质量期望的场景。这个装饰器接受一个Python字典作为参数,字典的键是期望的描述,值是期望的约束条件。如果任何一行违反了字典中定义的任何一个期望,DLT管道将立即停止执行。

利用这个特性,我们可以通过Python代码动态生成包含所有列期望的字典,然后将其传递给@dlt.expect_all_or_fail。

步骤一:动态生成期望字典

首先,获取数据表的所有列名,然后遍历这些列名,为每列构造一个非NULL的期望。

千帆AppBuilder
千帆AppBuilder

百度推出的一站式的AI原生应用开发资源和工具平台,致力于实现人人都能开发自己的AI原生应用。

千帆AppBuilder 174
查看详情 千帆AppBuilder
# 假设这是您的列名列表
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']

expectations = {}
for col_name in columns_list:
    # 为每列生成一个名为 "valid_<列名>" 的期望,并检查其是否为 NULL
    expectations[f"valid_{col_name}"] = f"{col_name} IS NOT NULL"

# 打印生成的期望字典,以便理解其结构
print(expectations)
登录后复制

上述代码将生成一个如下所示的字典:

{'valid_state': 'state IS NOT NULL', 
'valid_store_id': 'store_id IS NOT NULL',
'valid_product_category': 'product_category IS NOT NULL', 
'valid_SKU': 'SKU IS NOT NULL',
'valid_price': 'price IS NOT NULL'}
登录后复制

步骤二:将字典应用于 DLT 表

将生成的expectations字典传递给@dlt.expect_all_or_fail装饰器,并将其应用于您的DLT表定义。

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

# 假设这是您的列名列表
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']

# 动态生成期望字典
expectations = {}
for col_name in columns_list:
    expectations[f"valid_{col_name}"] = f"{col_name} IS NOT NULL"

# 定义表的Schema,这对于从CSV等文件读取数据时非常重要
# 实际应用中,您可能从数据源推断或定义更复杂的Schema
schema = StructType([
    StructField("state", StringType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("product_category", StringType(), True),
    StructField("SKU", StringType(), True),
    StructField("price", DoubleType(), True)
])

@dlt.table(
  comment="原始点击流数据,已应用所有列的非空期望"
)
@dlt.expect_all_or_fail(expectations) # 将动态生成的期望字典应用于表
def clickstream_raw():
  # 假设从CSV文件读取数据,请根据实际情况修改路径和格式
  # 注意:如果CSV包含标题行,必须设置 .option("header", "true")
  return (
    spark.read
      .schema(schema) # 应用定义的Schema
      .option("header", "true") # 如果CSV有标题行,请设置为true
      .format("csv")
      .load("/path/to/your/csv/data.csv") # 替换为您的数据路径
  )

# 在实际的DLT管道中,您不需要手动创建SparkSession,DLT运行时会提供
# 此处仅为示例完整性,展示如何读取数据
# spark = SparkSession.builder.appName("DLTExpectationsExample").getOrCreate()
登录后复制

在这个例子中,clickstream_raw表在加载数据时,会检查columns_list中定义的每一列是否为NULL。如果任何一列的任何一行包含NULL值,整个DLT管道将立即失败,从而确保数据的严格质量。

注意事项与最佳实践

  1. 期望行为: @dlt.expect_all_or_fail的“或失败”行为意味着一旦有任何一行违反了任何一个期望,整个管道就会中断。如果您的需求是允许部分脏数据通过,但要记录或隔离它们,可以考虑使用@dlt.expect_all_or_drop(丢弃不符合期望的行)或@dlt.expect_all_or_quarantine(隔离不符合期望的行)。

  2. 动态列列表: 在实际生产环境中,columns_list可能不是硬编码的。您可以从数据源的Schema动态获取列名,例如:

    # 假设您已经读取了DataFrame 'df'
    # columns_list = df.columns
    # 或者从Spark Schema中提取
    # schema_df = spark.read.format("csv").load("/path/to/data.csv")
    # columns_list = [field.name for field in schema_df.schema.fields]
    登录后复制
  3. 数据源配置: 当从文件(如CSV)读取数据时,确保正确配置数据源选项,例如header=true以正确解析列名。错误的配置可能导致列名不匹配,从而使期望失效或产生错误。

  4. 期望的复杂性: 虽然示例中展示的是简单的IS NOT NULL检查,但expectations字典中的约束条件可以是任何有效的Spark SQL表达式,允许您定义更复杂的业务规则。

  5. 可维护性: 这种动态生成期望的方式极大地提高了代码的可维护性。当表的列发生变化时,只需更新columns_list(或动态获取),而无需修改大量的@dlt.expect装饰器。

总结

通过巧妙地结合Python的动态编程能力和Delta Live Tables的@dlt.expect_all_or_fail装饰器,我们可以高效、声明式地为表中的所有列应用数据质量期望。这种方法不仅避免了手动重复和低效的循环,还提升了数据管道的鲁棒性、可维护性和自动化程度,是构建高质量DLT管道的关键实践。掌握这一技巧,将使您能够更有效地管理大规模数据的数据质量。

以上就是Delta Live Tables:高效地为所有列应用数据质量期望的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号