
本文详细介绍了如何在pyspark中通过多条件连接(multiple conditional joins)和`coalesce`函数,智能地填充一个dataframe中依赖于另一个dataframe的缺失值。教程演示了如何针对不同缺失字段(如序列号和邮件)采用不同的连接键进行分步填充,并处理无匹配情况,确保数据完整性和准确性。
在数据处理过程中,我们经常遇到需要从一个数据源(或DataFrame)中获取信息来补充另一个数据源中的缺失值的情况。本教程将解决一个具体场景:给定两个DataFrame,persons 和 people,我们需要根据特定的业务逻辑填充 persons DataFrame中 serial_no 和 mail 列的缺失值。
具体要求如下:
首先,我们创建两个示例PySpark DataFrame来模拟 persons 和 people 数据。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit
# 初始化SparkSession
spark = SparkSession.builder.appName("DataFrameMissingValueFill").getOrCreate()
# 创建persons DataFrame
data_persons = [
("John", 25, 100483, "john@example.com"),
("Sam", 49, 448900, "sam@example.com"),
("Will", 63, None, "will@example.com"), # serial_no 缺失
("Robert", 20, 299011, None), # mail 缺失
("Hill", 78, None, "hill@example.com") # serial_no 缺失
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)
# 创建people DataFrame
data_people = [
("John", 100483, "john@example.com"),
("Sam", 448900, "sam@example.com"),
("Will", 229809, "will@example.com"),
("Robert", 299011, None),
("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)
print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()输出的原始DataFrame如下:
原始 persons DataFrame: +------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| null|will@example.com| |Robert| 20| 299011| null| | Hill| 78| null|hill@example.com| +------+---+---------+----------------+ 原始 people DataFrame: +------+------+----------------+ | name| s_no| e_mail| +------+------+----------------+ | John|100483|john@example.com| | Sam|448900| sam@example.com| | Will|229809|will@example.com| |Robert|299011| null| | Hill|567233|hill@example.com| +------+------+----------------+
为了满足上述复杂的填充逻辑,我们将采用分步连接(Sequential Joins)的方法。首先填充 serial_no,然后利用可能已更新的 serial_no 信息填充 mail。coalesce 函数在这里扮演了关键角色,它能够返回其参数列表中的第一个非空表达式。
在这一步中,我们关注 persons DataFrame中 serial_no 列的缺失值。根据需求,如果 serial_no 缺失,我们尝试通过 persons.mail 与 people.e_mail 进行左连接来获取 people.s_no。
# 步骤一:通过邮件地址连接,填充缺失的 serial_no
# 使用别名避免列名冲突
serials_enriched = persons.alias("p").join(
people.alias("pe"),
col("p.mail") == col("pe.e_mail"), # 连接条件:persons的mail与people的e_mail
"left" # 左连接,保留persons所有行
).select(
col("p.name"),
col("p.age"),
# 使用coalesce函数:优先选择p.serial_no,其次是pe.s_no,最后是"NA"
coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
col("p.mail")
)
print("填充 serial_no 后的 DataFrame:")
serials_enriched.show()输出结果:
填充 serial_no 后的 DataFrame: +------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| 229809|will@example.com| |Robert| 20| 299011| null| | Hill| 78| 567233|hill@example.com| +------+---+---------+----------------+
可以看到,Will 和 Hill 的 serial_no 已经成功从 people DataFrame中获取并填充。Robert 的 serial_no 本身不缺失,所以保持不变。
现在,我们使用在步骤一中已经填充了 serial_no 的 serials_enriched DataFrame。我们将它与 people DataFrame再次进行左连接,这次的连接条件是 serial_no 与 s_no。目标是填充 mail 列的缺失值。
# 步骤二:通过序列号连接,填充缺失的 mail
# 注意:这里使用上一步生成的 serials_enriched DataFrame
final_df = serials_enriched.alias("se").join(
people.alias("pe"),
col("se.serial_no") == col("pe.s_no"), # 连接条件:serials_enriched的serial_no与people的s_no
"left" # 左连接,保留serials_enriched所有行
).select(
col("se.name"),
col("se.age"),
col("se.serial_no"),
# 使用coalesce函数:优先选择se.mail,其次是pe.e_mail,最后是"NA"
coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)
print("最终填充后的 DataFrame:")
final_df.show()
# 停止SparkSession
spark.stop()输出结果:
最终填充后的 DataFrame: +------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| 229809|will@example.com| |Robert| 20| 299011| NA| | Hill| 78| 567233|hill@example.com| +------+---+---------+----------------+
最终结果显示,Robert 的 mail 列被填充为 "NA",因为 people DataFrame中与 Robert 的 s_no (299011) 对应的 e_mail 也是缺失的。其他行的 mail 值保持不变或已成功填充。
为了方便,以下是整合了所有步骤的完整PySpark代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit
# 初始化SparkSession
spark = SparkSession.builder.appName("DataFrameMissingValueFill").getOrCreate()
# 创建persons DataFrame
data_persons = [
("John", 25, 100483, "john@example.com"),
("Sam", 49, 448900, "sam@example.com"),
("Will", 63, None, "will@example.com"),
("Robert", 20, 299011, None),
("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)
# 创建people DataFrame
data_people = [
("John", 100483, "john@example.com"),
("Sam", 448900, "sam@example.com"),
("Will", 229809, "will@example.com"),
("Robert", 299011, None),
("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)
print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()
# 步骤一:通过邮件地址连接,填充缺失的 serial_no
serials_enriched = persons.alias("p").join(
people.alias("pe"),
col("p.mail") == col("pe.e_mail"),
"left"
).select(
col("p.name"),
col("p.age"),
coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
col("p.mail")
)
print("填充 serial_no 后的 DataFrame:")
serials_enriched.show()
# 步骤二:通过序列号连接,填充缺失的 mail
final_df = serials_enriched.alias("se").join(
people.alias("pe"),
col("se.serial_no") == col("pe.s_no"),
"left"
).select(
col("se.name"),
col("se.age"),
col("se.serial_no"),
coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)
print("最终填充后的 DataFrame:")
final_df.show()
# 停止SparkSession
spark.stop()本教程展示了如何利用PySpark的强大功能,通过多步左连接和 coalesce 函数,优雅且高效地解决DataFrame中复杂条件的缺失值填充问题。这种方法不仅能够处理多字段、多条件的填充需求,还能灵活应对无匹配的情况,确保最终数据的完整性和业务逻辑的准确性。掌握这种技术对于处理真实世界中的数据集成和清洗任务至关重要。
以上就是PySpark DataFrame缺失值智能填充策略:基于多条件连接的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号