
在数据处理过程中,我们经常会遇到dataframe中存在序列性缺失值的情况。例如,在一个包含 row_id 和 group_id 的pyspark dataframe中,row_id 是一个递增且唯一的序列号,而 group_id 则表示一个组的唯一标识。当 group_id 首次出现一个非空值时,它标志着一个新组的开始,此后的 null 值都应填充为该组的起始 group_id,直到下一个非空 group_id 出现。
例如,原始数据可能如下:
row_id, group_id 1, 1 2, null 3, null 4, null 5, 5 6, null 7, null 8, 8 ...
我们的目标是将其转换为:
row_id, group_id 1, 1 2, 1 3, 1 4, 1 5, 5 6, 5 7, 5 8, 8 ...
这种填充需求在大规模数据集(例如数百万甚至数十亿条记录)上需要高效的解决方案。
PySpark的窗口函数提供了一种强大的机制,允许我们在DataFrame的特定“窗口”内执行计算。一个窗口定义了一组与当前行相关的行,并且可以根据一个或多个列进行排序。在处理序列性数据和聚合操作时,窗口函数表现出卓越的灵活性和性能。
本教程将利用以下窗口函数特性:
解决此问题的关键在于正确定义窗口规范,并利用 last 函数在窗口内获取最近的非空 group_id。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 1. 创建一个SparkSession
spark = SparkSession.builder.appName("SequentialNullFill").getOrCreate()
# 2. 准备示例DataFrame
data = [
(1, 1), (2, None), (3, None), (4, None),
(5, 5), (6, None), (7, None),
(8, 8), (9, None), (10, None), (11, None), (12, None)
]
columns = ["row_id", "group_id"]
df = spark.createDataFrame(data, columns)
print("原始DataFrame:")
df.show()
# 3. 定义窗口规范
# 窗口按 row_id 升序排列
# 范围是从分区开始到当前行(包括当前行)
windowSpec = Window.orderBy("row_id").rowsBetween(Window.unboundedPreceding, 0)
# 4. 使用 last 窗口函数填充 null 值
# ignorenulls=True 确保只考虑非空值
filled_df = df.withColumn(
"group_id",
F.last("group_id", ignorenulls=True).over(windowSpec)
)
print("填充缺失值后的DataFrame:")
filled_df.show()
# 停止SparkSession
spark.stop()原始DataFrame: +------+--------+ |row_id|group_id| +------+--------+ | 1| 1| | 2| null| | 3| null| | 4| null| | 5| 5| | 6| null| | 7| null| | 8| 8| | 9| null| | 10| null| | 11| null| | 12| null| +------+--------+ 填充缺失值后的DataFrame: +------+--------+ |row_id|group_id| +------+--------+ | 1| 1| | 2| 1| | 3| 1| | 4| 1| | 5| 5| | 6| 5| | 7| 5| | 8| 8| | 9| 8| | 10| 8| | 11| 8| | 12| 8| +------+--------+
本教程详细阐述了如何在PySpark DataFrame中,利用窗口函数 (Window) 结合 last 函数和 ignorenulls=True 参数,高效地实现序列性缺失值填充。通过定义正确的窗口规范 (Window.orderBy("row_id").rowsBetween(Window.unboundedPreceding, 0)),我们能够将 group_id 列中的 null 值填充为其所在序列中最近的非空值,从而满足数据连续性的需求。此方法在处理大规模数据集时表现出良好的性能和扩展性,是PySpark数据清洗和预处理中的一个重要技巧。
以上就是PySpark中利用窗口函数按序填充DataFrame缺失值的高效策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号