
本文探讨了如何在Delta Live Tables (DLT) 中高效地为表中的所有列动态应用数据质量期望,解决了传统方法中手动指定或低效循环的问题。通过利用`@dlt.expect_all_or_fail`装饰器,结合Python动态生成期望字典,用户可以轻松实现表级别的数据质量检查,确保数据完整性,并在不符合预期的行出现时及时中断管道,从而提升数据治理的自动化和可维护性。
Delta Live Tables (DLT) 是Databricks提供的一个声明式框架,用于构建可靠、可维护且可测试的数据管道。其核心功能之一是内置的数据质量期望(Expectations),允许开发者在数据处理的不同阶段定义数据必须满足的条件。这些期望可以用于验证数据的完整性、一致性和准确性,并在数据不符合预期时采取相应的行动(如失败、丢弃或隔离)。
通常,我们在DLT中使用@dlt.expect或@dlt.expect_or_fail等装饰器为表的特定列定义数据质量规则,例如:
import dlt
from pyspark.sql.functions import expr
@dlt.table(
comment="Wikipedia clickstream data cleaned and prepared for analysis."
)
@dlt.expect("valid_current_page_title", "current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_prepared():
return (
dlt.read("clickstream_raw")
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_title", "click_count", "previous_page_title")
)然而,当需要对表中所有列应用相同的期望(例如,所有列都不能为NULL)时,手动为每一列编写@dlt.expect装饰器将变得繁琐且难以维护。
在处理包含大量列的数据集时,如果需要对所有列应用相同的通用数据质量检查(例如,检查所有列是否为NULL),传统方法会面临以下问题:
手动重复: 为每列手动添加@dlt.expect装饰器,代码冗长且易出错。
低效循环: 尝试通过Python循环动态生成多个DLT表定义来模拟对每列的检查,如以下示例所示:
# 这种方法非常低效且不推荐
# for column in columns_list_order_table:
# exec(f'''
# @dlt.table(comment="null value validations for {column}")
# @dlt.expect_or_drop("null values","is_null == false")
# def null_validation_orders_for_column_{column}():
# df = dlt.read("bronze_orders")
# return df.withColumn("is_null", col("{column}").isNull())
# ''')这种exec动态生成函数和表的方式不仅效率低下,而且难以调试和管理,严重违背了DLT声明式管道的初衷。
DLT提供了一个强大的装饰器@dlt.expect_all_or_fail(expectations),专门用于解决需要同时应用多个数据质量期望的场景。这个装饰器接受一个Python字典作为参数,字典的键是期望的描述,值是期望的约束条件。如果任何一行违反了字典中定义的任何一个期望,DLT管道将立即停止执行。
利用这个特性,我们可以通过Python代码动态生成包含所有列期望的字典,然后将其传递给@dlt.expect_all_or_fail。
首先,获取数据表的所有列名,然后遍历这些列名,为每列构造一个非NULL的期望。
# 假设这是您的列名列表
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']
expectations = {}
for col_name in columns_list:
# 为每列生成一个名为 "valid_<列名>" 的期望,并检查其是否为 NULL
expectations[f"valid_{col_name}"] = f"{col_name} IS NOT NULL"
# 打印生成的期望字典,以便理解其结构
print(expectations)上述代码将生成一个如下所示的字典:
{'valid_state': 'state IS NOT NULL',
'valid_store_id': 'store_id IS NOT NULL',
'valid_product_category': 'product_category IS NOT NULL',
'valid_SKU': 'SKU IS NOT NULL',
'valid_price': 'price IS NOT NULL'}将生成的expectations字典传递给@dlt.expect_all_or_fail装饰器,并将其应用于您的DLT表定义。
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# 假设这是您的列名列表
columns_list = ['state', 'store_id', 'product_category', 'SKU', 'price']
# 动态生成期望字典
expectations = {}
for col_name in columns_list:
expectations[f"valid_{col_name}"] = f"{col_name} IS NOT NULL"
# 定义表的Schema,这对于从CSV等文件读取数据时非常重要
# 实际应用中,您可能从数据源推断或定义更复杂的Schema
schema = StructType([
StructField("state", StringType(), True),
StructField("store_id", IntegerType(), True),
StructField("product_category", StringType(), True),
StructField("SKU", StringType(), True),
StructField("price", DoubleType(), True)
])
@dlt.table(
comment="原始点击流数据,已应用所有列的非空期望"
)
@dlt.expect_all_or_fail(expectations) # 将动态生成的期望字典应用于表
def clickstream_raw():
# 假设从CSV文件读取数据,请根据实际情况修改路径和格式
# 注意:如果CSV包含标题行,必须设置 .option("header", "true")
return (
spark.read
.schema(schema) # 应用定义的Schema
.option("header", "true") # 如果CSV有标题行,请设置为true
.format("csv")
.load("/path/to/your/csv/data.csv") # 替换为您的数据路径
)
# 在实际的DLT管道中,您不需要手动创建SparkSession,DLT运行时会提供
# 此处仅为示例完整性,展示如何读取数据
# spark = SparkSession.builder.appName("DLTExpectationsExample").getOrCreate()在这个例子中,clickstream_raw表在加载数据时,会检查columns_list中定义的每一列是否为NULL。如果任何一列的任何一行包含NULL值,整个DLT管道将立即失败,从而确保数据的严格质量。
期望行为: @dlt.expect_all_or_fail的“或失败”行为意味着一旦有任何一行违反了任何一个期望,整个管道就会中断。如果您的需求是允许部分脏数据通过,但要记录或隔离它们,可以考虑使用@dlt.expect_all_or_drop(丢弃不符合期望的行)或@dlt.expect_all_or_quarantine(隔离不符合期望的行)。
动态列列表: 在实际生产环境中,columns_list可能不是硬编码的。您可以从数据源的Schema动态获取列名,例如:
# 假设您已经读取了DataFrame 'df'
# columns_list = df.columns
# 或者从Spark Schema中提取
# schema_df = spark.read.format("csv").load("/path/to/data.csv")
# columns_list = [field.name for field in schema_df.schema.fields]数据源配置: 当从文件(如CSV)读取数据时,确保正确配置数据源选项,例如header=true以正确解析列名。错误的配置可能导致列名不匹配,从而使期望失效或产生错误。
期望的复杂性: 虽然示例中展示的是简单的IS NOT NULL检查,但expectations字典中的约束条件可以是任何有效的Spark SQL表达式,允许您定义更复杂的业务规则。
可维护性: 这种动态生成期望的方式极大地提高了代码的可维护性。当表的列发生变化时,只需更新columns_list(或动态获取),而无需修改大量的@dlt.expect装饰器。
通过巧妙地结合Python的动态编程能力和Delta Live Tables的@dlt.expect_all_or_fail装饰器,我们可以高效、声明式地为表中的所有列应用数据质量期望。这种方法不仅避免了手动重复和低效的循环,还提升了数据管道的鲁棒性、可维护性和自动化程度,是构建高质量DLT管道的关键实践。掌握这一技巧,将使您能够更有效地管理大规模数据的数据质量。
以上就是Delta Live Tables:高效地为所有列应用数据质量期望的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号