PySpark DataFrame缺失值智能填充策略:基于多条件连接

聖光之護
发布: 2025-10-12 13:31:17
原创
997人浏览过

PySpark DataFrame缺失值智能填充策略:基于多条件连接

本文详细介绍了如何在pyspark中通过多条件连接(multiple conditional joins)和`coalesce`函数,智能地填充一个dataframe中依赖于另一个dataframe的缺失值。教程演示了如何针对不同缺失字段(如序列号和邮件)采用不同的连接键进行分步填充,并处理无匹配情况,确保数据完整性和准确性。

问题描述

在数据处理过程中,我们经常遇到需要从一个数据源(或DataFrame)中获取信息来补充另一个数据源中的缺失值的情况。本教程将解决一个具体场景:给定两个DataFrame,persons 和 people,我们需要根据特定的业务逻辑填充 persons DataFrame中 serial_no 和 mail 列的缺失值。

具体要求如下:

  1. 如果 persons DataFrame中的 serial_no 缺失,则尝试通过 mail 列与 people DataFrame的 e_mail 列进行连接,以获取 people DataFrame中的 s_no 值来填充 serial_no。
  2. 如果 persons DataFrame中的 mail 缺失,则尝试通过 serial_no 列(可能是原始值,也可能是第一步填充后的值)与 people DataFrame的 s_no 列进行连接,以获取 people DataFrame中的 e_mail 值来填充 mail。
  3. 如果以上两种连接都未能找到匹配值,则将缺失值填充为字符串 "NA"。

数据准备

首先,我们创建两个示例PySpark DataFrame来模拟 persons 和 people 数据。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("DataFrameMissingValueFill").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"), # serial_no 缺失
    ("Robert", 20, 299011, None), # mail 缺失
    ("Hill", 78, None, "hill@example.com") # serial_no 缺失
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()
登录后复制

输出的原始DataFrame如下:

原始 persons DataFrame:
+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|     null|will@example.com|
|Robert| 20|   299011|            null|
|  Hill| 78|     null|hill@example.com|
+------+---+---------+----------------+

原始 people DataFrame:
+------+------+----------------+
|  name|  s_no|          e_mail|
+------+------+----------------+
|  John|100483|john@example.com|
|   Sam|448900| sam@example.com|
|  Will|229809|will@example.com|
|Robert|299011|            null|
|  Hill|567233|hill@example.com|
+------+------+----------------+
登录后复制

解决方案:分步连接与合并

为了满足上述复杂的填充逻辑,我们将采用分步连接(Sequential Joins)的方法。首先填充 serial_no,然后利用可能已更新的 serial_no 信息填充 mail。coalesce 函数在这里扮演了关键角色,它能够返回其参数列表中的第一个非空表达式。

步骤一:填充缺失的 serial_no

在这一步中,我们关注 persons DataFrame中 serial_no 列的缺失值。根据需求,如果 serial_no 缺失,我们尝试通过 persons.mail 与 people.e_mail 进行左连接来获取 people.s_no。

# 步骤一:通过邮件地址连接,填充缺失的 serial_no
# 使用别名避免列名冲突
serials_enriched = persons.alias("p").join(
    people.alias("pe"),
    col("p.mail") == col("pe.e_mail"), # 连接条件:persons的mail与people的e_mail
    "left" # 左连接,保留persons所有行
).select(
    col("p.name"),
    col("p.age"),
    # 使用coalesce函数:优先选择p.serial_no,其次是pe.s_no,最后是"NA"
    coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
    col("p.mail")
)

print("填充 serial_no 后的 DataFrame:")
serials_enriched.show()
登录后复制

输出结果:

多墨智能
多墨智能

多墨智能 - AI 驱动的创意工作流写作工具

多墨智能 108
查看详情 多墨智能
填充 serial_no 后的 DataFrame:
+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|            null|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+
登录后复制

可以看到,Will 和 Hill 的 serial_no 已经成功从 people DataFrame中获取并填充。Robert 的 serial_no 本身不缺失,所以保持不变。

步骤二:填充缺失的 mail

现在,我们使用在步骤一中已经填充了 serial_no 的 serials_enriched DataFrame。我们将它与 people DataFrame再次进行左连接,这次的连接条件是 serial_no 与 s_no。目标是填充 mail 列的缺失值。

# 步骤二:通过序列号连接,填充缺失的 mail
# 注意:这里使用上一步生成的 serials_enriched DataFrame
final_df = serials_enriched.alias("se").join(
    people.alias("pe"),
    col("se.serial_no") == col("pe.s_no"), # 连接条件:serials_enriched的serial_no与people的s_no
    "left" # 左连接,保留serials_enriched所有行
).select(
    col("se.name"),
    col("se.age"),
    col("se.serial_no"),
    # 使用coalesce函数:优先选择se.mail,其次是pe.e_mail,最后是"NA"
    coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)

print("最终填充后的 DataFrame:")
final_df.show()

# 停止SparkSession
spark.stop()
登录后复制

输出结果:

最终填充后的 DataFrame:
+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|              NA|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+
登录后复制

最终结果显示,Robert 的 mail 列被填充为 "NA",因为 people DataFrame中与 Robert 的 s_no (299011) 对应的 e_mail 也是缺失的。其他行的 mail 值保持不变或已成功填充。

完整代码示例

为了方便,以下是整合了所有步骤的完整PySpark代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("DataFrameMissingValueFill").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"),
    ("Robert", 20, 299011, None),
    ("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()

# 步骤一:通过邮件地址连接,填充缺失的 serial_no
serials_enriched = persons.alias("p").join(
    people.alias("pe"),
    col("p.mail") == col("pe.e_mail"),
    "left"
).select(
    col("p.name"),
    col("p.age"),
    coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
    col("p.mail")
)

print("填充 serial_no 后的 DataFrame:")
serials_enriched.show()

# 步骤二:通过序列号连接,填充缺失的 mail
final_df = serials_enriched.alias("se").join(
    people.alias("pe"),
    col("se.serial_no") == col("pe.s_no"),
    "left"
).select(
    col("se.name"),
    col("se.age"),
    col("se.serial_no"),
    coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)

print("最终填充后的 DataFrame:")
final_df.show()

# 停止SparkSession
spark.stop()
登录后复制

注意事项

  1. 连接顺序的重要性: 在本例中,填充 serial_no 的操作依赖于 mail,而填充 mail 的操作可能依赖于新填充的 serial_no。因此,连接的顺序至关重要。如果先尝试填充 mail,那么它可能无法利用到尚未填充的 serial_no 信息。
  2. people DataFrame中的重复值: 如果 people DataFrame中 e_mail 或 s_no 存在重复值,那么左连接可能会导致 persons DataFrame中的行被复制。在实际应用中,如果 people DataFrame可能包含重复的连接键,通常需要先对其进行去重或聚合,以确保一对一或一对多连接的预期行为。例如,可以使用 people.dropDuplicates(["e_mail"]) 或 people.groupBy("e_mail").agg(...)。
  3. coalesce 函数的灵活性: coalesce 函数非常强大,可以处理多个备选值。它会从左到右评估参数,并返回第一个非 null 的值。这使得在填充缺失值时能够设定优先级。
  4. 数据类型一致性: 确保在连接和合并操作中涉及的列具有兼容的数据类型。例如,serial_no (整数) 和 s_no (整数) 应该匹配,mail (字符串) 和 e_mail (字符串) 也应匹配。
  5. 性能考量: 对于非常大的DataFrame,多次连接操作可能会影响性能。可以考虑使用 broadcast hint (people.hint("broadcast")) 来优化小型DataFrame的连接,以减少数据混洗。

总结

本教程展示了如何利用PySpark的强大功能,通过多步左连接和 coalesce 函数,优雅且高效地解决DataFrame中复杂条件的缺失值填充问题。这种方法不仅能够处理多字段、多条件的填充需求,还能灵活应对无匹配的情况,确保最终数据的完整性和业务逻辑的准确性。掌握这种技术对于处理真实世界中的数据集成和清洗任务至关重要。

以上就是PySpark DataFrame缺失值智能填充策略:基于多条件连接的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号