
本教程详细介绍了如何利用PySpark处理两个DataFrame之间的缺失值填充问题。通过分步执行左连接操作,并结合`coalesce`函数,我们能够根据不同的匹配键(如邮件或序列号)从源DataFrame中智能地补充目标DataFrame中的缺失数据,同时处理无匹配项的情况,确保数据完整性和准确性。
在数据处理和集成任务中,我们经常需要从一个数据源(通常是更完整或最新的数据)中提取信息来补充另一个数据源中的缺失字段。当补充逻辑涉及多个匹配键和条件判断时,传统的合并操作可能无法直接满足需求。本教程将展示如何使用PySpark的DataFrame API,通过巧妙地结合多次左连接(Left Join)和coalesce函数,实现对缺失值的有条件填充。
假设我们有两个DataFrame:persons和people。persons是我们的主DataFrame,其中包含一些缺失的serial_no(序列号)和mail(邮箱)信息。people是辅助DataFrame,包含了更完整的序列号和邮箱数据。我们的目标是根据以下规则填充persons中的缺失值:
首先,我们创建示例DataFrame来模拟这个场景:
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit
# 初始化SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()
# 创建persons DataFrame
data_persons = [
("John", 25, 100483, "john@example.com"),
("Sam", 49, 448900, "sam@example.com"),
("Will", 63, None, "will@example.com"),
("Robert", 20, 299011, None),
("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)
# 创建people DataFrame
data_people = [
("John", 100483, "john@example.com"),
("Sam", 448900, "sam@example.com"),
("Will", 229809, "will@example.com"),
("Robert", 299011, None),
("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)
print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()原始 persons DataFrame:
+------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| NULL|will@example.com| |Robert| 20| 299011| NULL| | Hill| 78| NULL|hill@example.com| +------+---+---------+----------------+
原始 people DataFrame:
+------+------+----------------+ | name| s_no| e_mail| +------+------+----------------+ | John|100483|john@example.com| | Sam|448900| sam@example.com| | Will|229809|will@example.com| |Robert|299011| NULL| | Hill|567233|hill@example.com| +------+------+----------------+
为了实现复杂的条件填充,我们将分两步进行左连接。每一步专注于填充一个特定的缺失字段。
首先,我们通过persons的mail字段与people的e_mail字段进行左连接,以尝试填充serial_no的缺失值。
# 步骤一:根据mail匹配,填充serial_no
serials_enriched = persons.alias("p") \
.join(people.alias("pe1"), persons["mail"] == people["e_mail"], "left") \
.select(
persons["name"],
persons["age"],
# 使用coalesce函数:优先使用persons的serial_no,如果为null则使用people的s_no,
# 如果两者都为null,则填充"NA"。
coalesce(persons["serial_no"], people["s_no"], lit("NA")).alias("serial_no"),
persons["mail"]
)
print("根据邮箱填充序列号后的 DataFrame:")
serials_enriched.show()serials_enriched DataFrame:
+------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| 229809|will@example.com| |Robert| 20| 299011| NULL| | Hill| 78| 567233|hill@example.com| +------+---+---------+----------------+
在这一步中,Will的serial_no成功从people中获取到229809。Robert的mail是NULL,所以无法通过邮箱匹配,其serial_no保持不变(因为persons中已有)。
接下来,我们使用上一步得到的结果serials_enriched,通过serial_no字段与people的s_no字段进行左连接,以尝试填充mail的缺失值。
# 步骤二:根据serial_no匹配,填充mail
mail_enriched = serials_enriched.alias("se") \
.join(people.alias("pe2"), serials_enriched["serial_no"] == people["s_no"], "left") \
.select(
serials_enriched["name"],
serials_enriched["age"],
serials_enriched["serial_no"],
# 使用coalesce函数:优先使用serials_enriched的mail,如果为null则使用people的e_mail,
# 如果两者都为null,则填充"NA"。
coalesce(serials_enriched["mail"], people["e_mail"], lit("NA")).alias("mail")
)
print("最终填充后的 DataFrame:")
mail_enriched.show()mail_enriched DataFrame (最终结果):
+------+---+---------+----------------+ | name|age|serial_no| mail| +------+---+---------+----------------+ | John| 25| 100483|john@example.com| | Sam| 49| 448900| sam@example.com| | Will| 63| 229809|will@example.com| |Robert| 20| 299011| NA| | Hill| 78| 567233|hill@example.com| +------+---+---------+----------------+
在这一步中,Robert的mail成功被填充为NA,因为persons中mail为NULL,且people中对应的e_mail也为NULL。
将上述两个步骤整合到一起,形成完整的解决方案:
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit
# 初始化SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()
# 创建persons DataFrame
data_persons = [
("John", 25, 100483, "john@example.com"),
("Sam", 49, 448900, "sam@example.com"),
("Will", 63, None, "will@example.com"),
("Robert", 20, 299011, None),
("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)
# 创建people DataFrame
data_people = [
("John", 100483, "john@example.com"),
("Sam", 448900, "sam@example.com"),
("Will", 229809, "will@example.com"),
("Robert", 299011, None),
("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)
# 步骤一:根据mail匹配,填充serial_no
serials_enriched = persons.alias("p") \
.join(people.alias("pe1"), persons["mail"] == people["e_mail"], "left") \
.select(
persons["name"],
persons["age"],
coalesce(persons["serial_no"], people["s_no"], lit("NA")).alias("serial_no"),
persons["mail"]
)
# 步骤二:根据serial_no匹配,填充mail
final_df = serials_enriched.alias("se") \
.join(people.alias("pe2"), serials_enriched["serial_no"] == people["s_no"], "left") \
.select(
serials_enriched["name"],
serials_enriched["age"],
serials_enriched["serial_no"],
coalesce(serials_enriched["mail"], people["e_mail"], lit("NA")).alias("mail")
)
print("最终填充后的 DataFrame:")
final_df.show()
# 停止SparkSession
spark.stop()通过上述方法,我们能够灵活且精确地处理DataFrame中的缺失值填充问题,即使填充逻辑涉及多个匹配条件和源数据表。这种模式在数据清洗和特征工程中非常实用。
以上就是使用PySpark合并DataFrame并有条件地填充缺失值的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号