
本教程详细介绍了如何利用 PySpark 框架,通过多阶段的条件连接(Join)操作,从一个辅助数据帧中智能地填充主数据帧中的缺失值。文章演示了如何根据不同的匹配键(如邮件地址或序列号)有条件地更新字段,并使用 `coalesce` 函数优雅地处理空值,最终将未找到的缺失值替换为指定默认值,确保数据完整性。
在数据处理和集成任务中,我们经常会遇到需要从一个数据源补充另一个数据源中缺失信息的情况。特别是在处理大型数据集时,使用 Apache Spark 的 DataFrame API 能够高效地完成此类任务。本文将深入探讨一种常见场景:如何根据不同的匹配条件,从一个辅助 DataFrame 中智能地填充主 DataFrame 的缺失字段。我们将通过一个具体的 PySpark 示例来演示这一过程,包括如何处理多重连接、条件性填充以及空值处理。
假设我们有两个 DataFrame:persons 和 people。persons 是我们的主数据帧,其中包含 name、age、serial_no 和 mail 字段,但 serial_no 和 mail 字段可能存在缺失值。people 是一个辅助数据帧,也包含 name、s_no(序列号)和 e_mail(邮件地址)字段,可以用来补充 persons 中的缺失信息。
我们的目标是:
首先,我们创建示例数据帧:
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit, col
# 初始化 SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()
# 创建 persons DataFrame
persons_data = [
("John", 25, 100483, "john@example.com"),
("Sam", 49, 448900, "sam@example.com"),
("Will", 63, None, "will@example.com"),
("Robert", 20, 299011, None),
("Hill", 78, None, "hill@example.com")
]
persons_columns = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(persons_data, persons_columns)
print("原始 persons DataFrame:")
persons.show()
# 创建 people DataFrame
people_data = [
("John", 100483, "john@example.com"),
("Sam", 448900, "sam@example.com"),
("Will", 229809, "will@example.com"),
("Robert", 299011, None),
("Hill", 567233, "hill@example.com")
]
people_columns = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(people_data, people_columns)
print("\npeople DataFrame:")
people.show()运行上述代码,将得到如下输出:
原始 persons DataFrame: +------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| null|will@example.com| |Robert| 20| 299011| null| | Hill| 78| null|hill@example.com| +------+---+---------+----------------+ people DataFrame: +------+------+----------------+ | name| s_no| e_mail| +------+------+----------------+ | John|100483|john@example.com| | Sam|448900| sam@example.com| | Will|229809|will@example.com| |Robert|299011| null| | Hill|567233|hill@example.com| +------+------+----------------+
为了实现上述目标,我们可以采用分两步进行左连接(left join)的策略。每一步连接都专注于填充一个特定的缺失字段。
首先,我们尝试通过 mail 字段将 persons 与 people 连接,以获取缺失的 serial_no。
# 步骤一:通过邮件地址填充缺失的 serial_no
serials_enriched = persons.alias("p").join(
people.alias("pe"),
col("p.mail") == col("pe.e_mail"),
"left"
).select(
col("p.name"),
col("p.age"),
coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
col("p.mail")
)
print("\n填充 serial_no 后的 DataFrame:")
serials_enriched.show()执行此步骤后,Will 的 serial_no 将被 229809 填充。
填充 serial_no 后的 DataFrame: +------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| 229809|will@example.com| |Robert| 20| 299011| null| | Hill| 78| 567233|hill@example.com| +------+---+---------+----------------+
接下来,我们基于 serials_enriched 数据帧(已经填充了部分 serial_no),再次与 people 数据帧进行左连接,这次是为了填充缺失的 mail 字段。
# 步骤二:通过序列号填充缺失的 mail
final_df = serials_enriched.alias("se").join(
people.alias("pe"),
col("se.serial_no") == col("pe.s_no"),
"left"
).select(
col("se.name"),
col("se.age"),
col("se.serial_no"),
coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)
print("\n最终填充后的 DataFrame:")
final_df.show()执行此步骤后,Robert 的 mail 将保持 null,因为 people 中 Robert 的 e_mail 也是 null,最终被替换为 "NA"。
最终填充后的 DataFrame: +------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| 229809|will@example.com| |Robert| 20| 299011| NA| | Hill| 78| 567233|hill@example.com| +------+---+---------+----------------+
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit, col
# 初始化 SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()
# 创建 persons DataFrame
persons_data = [
("John", 25, 100483, "john@example.com"),
("Sam", 49, 448900, "sam@example.com"),
("Will", 63, None, "will@example.com"),
("Robert", 20, 299011, None),
("Hill", 78, None, "hill@example.com")
]
persons_columns = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(persons_data, persons_columns)
# 创建 people DataFrame
people_data = [
("John", 100483, "john@example.com"),
("Sam", 448900, "sam@example.com"),
("Will", 229809, "will@example.com"),
("Robert", 299011, None),
("Hill", 567233, "hill@example.com")
]
people_columns = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(people_data, people_columns)
# 步骤一:通过邮件地址填充缺失的 serial_no
# 使用别名避免列名冲突
serials_enriched = persons.alias("p").join(
people.alias("pe_mail_join"), # 为避免后续join的别名冲突,此处使用更具体的别名
col("p.mail") == col("pe_mail_join.e_mail"),
"left"
).select(
col("p.name"),
col("p.age"),
coalesce(col("p.serial_no"), col("pe_mail_join.s_no"), lit("NA")).alias("serial_no"),
col("p.mail")
)
# 步骤二:通过序列号填充缺失的 mail
final_df = serials_enriched.alias("se").join(
people.alias("pe_sno_join"), # 再次为避免别名冲突
col("se.serial_no") == col("pe_sno_join.s_no"),
"left"
).select(
col("se.name"),
col("se.age"),
col("se.serial_no"),
coalesce(col("se.mail"), col("pe_sno_join.e_mail"), lit("NA")).alias("mail")
)
# 显示最终结果
print("\n最终填充后的 DataFrame:")
final_df.show()
# 停止 SparkSession
spark.stop()通过本教程,我们学习了如何使用 PySpark 的 DataFrame API,通过分步左连接和 coalesce 函数来智能地填充主数据帧中的缺失值。这种方法灵活且强大,能够处理复杂的条件性数据填充场景,确保数据的完整性和准确性。理解 coalesce 的工作原理和连接操作的细节是高效处理 Spark 数据转换的关键。
以上就是Spark DataFrame 缺失值智能填充策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号