使用PySpark合并DataFrame并有条件地填充缺失值

花韻仙語
发布: 2025-10-14 10:05:01
原创
479人浏览过

使用pyspark合并dataframe并有条件地填充缺失值

本教程详细介绍了如何利用PySpark处理两个DataFrame之间的缺失值填充问题。通过分步执行左连接操作,并结合`coalesce`函数,我们能够根据不同的匹配键(如邮件或序列号)从源DataFrame中智能地补充目标DataFrame中的缺失数据,同时处理无匹配项的情况,确保数据完整性和准确性。

引言

在数据处理和集成任务中,我们经常需要从一个数据源(通常是更完整或最新的数据)中提取信息来补充另一个数据源中的缺失字段。当补充逻辑涉及多个匹配键和条件判断时,传统的合并操作可能无法直接满足需求。本教程将展示如何使用PySpark的DataFrame API,通过巧妙地结合多次左连接(Left Join)和coalesce函数,实现对缺失值的有条件填充。

问题描述与数据准备

假设我们有两个DataFrame:persons和people。persons是我们的主DataFrame,其中包含一些缺失的serial_no(序列号)和mail(邮箱)信息。people是辅助DataFrame,包含了更完整的序列号和邮箱数据。我们的目标是根据以下规则填充persons中的缺失值:

  1. 如果persons中serial_no缺失,尝试通过mail字段与people中的e_mail匹配来获取s_no(序列号)。
  2. 如果persons中mail缺失,尝试通过serial_no字段与people中的s_no匹配来获取e_mail(邮箱)。
  3. 如果以上匹配均未找到,则填充为字符串"NA"。

首先,我们创建示例DataFrame来模拟这个场景:

from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"),
    ("Robert", 20, 299011, None),
    ("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()
登录后复制

原始 persons DataFrame:

+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|     NULL|will@example.com|
|Robert| 20|   299011|            NULL|
|  Hill| 78|     NULL|hill@example.com|
+------+---+---------+----------------+
登录后复制

原始 people DataFrame:

+------+------+----------------+
|  name|  s_no|          e_mail|
+------+------+----------------+
|  John|100483|john@example.com|
|   Sam|448900| sam@example.com|
|  Will|229809|will@example.com|
|Robert|299011|            NULL|
|  Hill|567233|hill@example.com|
+------+------+----------------+
登录后复制

解决方案:分步左连接与coalesce

为了实现复杂的条件填充,我们将分两步进行左连接。每一步专注于填充一个特定的缺失字段。

步骤一:根据邮箱填充缺失的序列号

首先,我们通过persons的mail字段与people的e_mail字段进行左连接,以尝试填充serial_no的缺失值。

有道小P
有道小P

有道小P,新一代AI全科学习助手,在学习中遇到任何问题都可以问我。

有道小P64
查看详情 有道小P
# 步骤一:根据mail匹配,填充serial_no
serials_enriched = persons.alias("p") \
    .join(people.alias("pe1"), persons["mail"] == people["e_mail"], "left") \
    .select(
        persons["name"],
        persons["age"],
        # 使用coalesce函数:优先使用persons的serial_no,如果为null则使用people的s_no,
        # 如果两者都为null,则填充"NA"。
        coalesce(persons["serial_no"], people["s_no"], lit("NA")).alias("serial_no"),
        persons["mail"]
    )

print("根据邮箱填充序列号后的 DataFrame:")
serials_enriched.show()
登录后复制

serials_enriched DataFrame:

+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|            NULL|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+
登录后复制

在这一步中,Will的serial_no成功从people中获取到229809。Robert的mail是NULL,所以无法通过邮箱匹配,其serial_no保持不变(因为persons中已有)。

步骤二:根据序列号填充缺失的邮箱

接下来,我们使用上一步得到的结果serials_enriched,通过serial_no字段与people的s_no字段进行左连接,以尝试填充mail的缺失值。

# 步骤二:根据serial_no匹配,填充mail
mail_enriched = serials_enriched.alias("se") \
    .join(people.alias("pe2"), serials_enriched["serial_no"] == people["s_no"], "left") \
    .select(
        serials_enriched["name"],
        serials_enriched["age"],
        serials_enriched["serial_no"],
        # 使用coalesce函数:优先使用serials_enriched的mail,如果为null则使用people的e_mail,
        # 如果两者都为null,则填充"NA"。
        coalesce(serials_enriched["mail"], people["e_mail"], lit("NA")).alias("mail")
    )

print("最终填充后的 DataFrame:")
mail_enriched.show()
登录后复制

mail_enriched DataFrame (最终结果):

+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|              NA|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+
登录后复制

在这一步中,Robert的mail成功被填充为NA,因为persons中mail为NULL,且people中对应的e_mail也为NULL。

完整代码示例

将上述两个步骤整合到一起,形成完整的解决方案:

from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("FillMissingValues").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"),
    ("Robert", 20, 299011, None),
    ("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

# 步骤一:根据mail匹配,填充serial_no
serials_enriched = persons.alias("p") \
    .join(people.alias("pe1"), persons["mail"] == people["e_mail"], "left") \
    .select(
        persons["name"],
        persons["age"],
        coalesce(persons["serial_no"], people["s_no"], lit("NA")).alias("serial_no"),
        persons["mail"]
    )

# 步骤二:根据serial_no匹配,填充mail
final_df = serials_enriched.alias("se") \
    .join(people.alias("pe2"), serials_enriched["serial_no"] == people["s_no"], "left") \
    .select(
        serials_enriched["name"],
        serials_enriched["age"],
        serials_enriched["serial_no"],
        coalesce(serials_enriched["mail"], people["e_mail"], lit("NA")).alias("mail")
    )

print("最终填充后的 DataFrame:")
final_df.show()

# 停止SparkSession
spark.stop()
登录后复制

注意事项与总结

  1. 左连接的选择: 使用left连接是关键,它确保persons DataFrame中的所有记录都被保留,即使在people DataFrame中没有匹配项。没有匹配的记录的来自people的列将显示为NULL,这正是coalesce函数处理的基础。
  2. coalesce函数: coalesce函数接受一列或多列作为参数,并返回第一个非NULL的值。如果所有参数都为NULL,则返回NULL。在本例中,我们还加入了lit("NA")作为最后一个参数,以确保在所有匹配尝试失败时,缺失值被填充为字符串"NA"。
  3. 别名(Aliases): 在连接操作中为DataFrame使用别名(例如persons.alias("p")和people.alias("pe1"))是一个良好的实践,可以提高代码的可读性,并避免在多表连接时列名冲突。
  4. 列的引用: 在select语句中,需要明确指定要选择的列是来自哪个DataFrame的,尤其是在连接了多个DataFrame之后。例如,persons["name"]确保我们选择的是原始persons DataFrame中的name列。
  5. 重复数据处理: 如果people DataFrame中存在e_mail或s_no的重复值,并且这些重复值可能导致不确定的匹配结果(例如,一个邮箱对应多个序列号),那么在执行连接之前,可能需要对people DataFrame进行去重或聚合操作,以确保每个匹配键只对应一个唯一的值。
  6. 性能考量: 对于非常大的DataFrame,多次连接可能会带来性能开销。然而,对于这种复杂的条件填充逻辑,分步连接通常是清晰且高效的实现方式。Spark的优化器通常能很好地处理这些连接操作。

通过上述方法,我们能够灵活且精确地处理DataFrame中的缺失值填充问题,即使填充逻辑涉及多个匹配条件和源数据表。这种模式在数据清洗和特征工程中非常实用。

以上就是使用PySpark合并DataFrame并有条件地填充缺失值的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号