
本教程详细介绍了如何在pyspark dataframe中对多个列应用多个聚合函数(如`min`和`max`),并将结果以行式结构呈现。通过分步演示,我们展示了如何利用`select`进行初步聚合,并结合`unionbyname`技巧将聚合结果重塑为易于分析的行式格式,适用于需要定制化聚合报告的场景。
在PySpark数据处理中,我们经常需要对DataFrame的多个列执行聚合操作,例如计算每个列的最小值和最大值。虽然PySpark的agg函数能够方便地进行多列多函数聚合,但其默认输出是将所有聚合结果并列在一行中。然而,在某些分析场景下,我们可能需要将不同聚合函数的结果以行(row-wise)的形式展示,即每一行代表一个聚合函数(如最小值、最大值),而列则对应原始DataFrame的列。本教程将详细介绍如何实现这种定制化的行式聚合输出。
假设我们有一个PySpark DataFrame,并希望计算其中所有数值列的最小值和最大值。一个常见的初步尝试可能是使用列表推导式结合agg函数:
from pyspark.sql import functions as F # 假设 df 是一个 PySpark DataFrame # exprs = [F.min(c).alias(c), F.max(c).alias(c) for c in df.columns] # df2 = df.agg(*exprs)
这种方法虽然可以计算出所有列的最小值和最大值,但其结果会是一个单行DataFrame,其中包含类似 min_col1, max_col1, min_col2, max_col2 等列。这与我们期望的“第一行是所有列的最小值,第二行是所有列的最大值”的行式输出格式不符。
为了实现行式聚合输出,我们需要采取一种分两步走的策略:
让我们通过一个具体的例子来演示这个过程。
首先,创建一个示例PySpark DataFrame:
import operator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# 初始化 SparkSession
spark = SparkSession.builder.appName("PySparkMultiAggTutorial").getOrCreate()
_data = [
(4, 123, 18, 29),
(8, 5, 26, 187),
(2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)
print("原始DataFrame:")
df.show()
# +-----+----+----+-----+
# |col_1|col2|col3|col_4|
# +-----+----+----+-----+
# | 4| 123| 18| 29|
# | 8| 5| 26| 187|
# | 2| 97| 18| 29|
# +-----+----+----+-----+我们首先为每个列生成min和max的聚合表达式,并使用df.select()来执行这些聚合。这里使用select而不是agg是因为select可以接受多个表达式作为参数,并直接创建新的列。
# 为每个列生成 min 和 max 聚合表达式
min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]
# 将所有聚合表达式合并,并使用 select 得到一个单行 DataFrame
# 注意:这里也可以使用 df.agg(*min_vals, *max_vals),效果类似
df_aggregated_single_row = df.select(min_vals + max_vals)
print("初步聚合后的单行DataFrame:")
df_aggregated_single_row.show()
# +-------+------+-------+--------+-------+-------+-------+--------+
# |min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|
# +-------+------+-------+--------+-------+-------+-------+--------+
# | 2| 5| 18| 29| 8| 123| 26| 187|
# +-------+------+-------+--------+-------+-------+-------+--------+注意事项: 如果df_aggregated_single_row后续会被多次使用,为了优化性能,建议对其进行cache()操作:df_aggregated_single_row.cache()。
现在我们有了包含所有聚合结果的单行DataFrame (df_aggregated_single_row)。接下来,我们需要将其重塑为期望的行式输出。这涉及到为每种聚合类型(如min和max)创建单独的DataFrame,并添加一个标识聚合类型的列,然后通过unionByName合并它们。
# 1. 创建 min 结果的 DataFrame
# - 添加 'agg_type' 列标识为 'min'
# - 重命名聚合列回原始列名
min_cols = operator.add(
[F.lit('min').alias('agg_type')], # 添加聚合类型标识列
[F.col(f'min_{c}').alias(c) for c in df.columns] # 选择并重命名 min_xxx 列
)
min_df = df_aggregated_single_row.select(min_cols)
# 2. 创建 max 结果的 DataFrame
# - 添加 'agg_type' 列标识为 'max'
# - 重命名聚合列回原始列名
max_cols = operator.add(
[F.lit('max').alias('agg_type')], # 添加聚合类型标识列
[F.col(f'max_{c}').alias(c) for c in df.columns] # 选择并重命名 max_xxx 列
)
max_df = df_aggregated_single_row.select(max_cols)
# 3. 使用 unionByName 合并 min_df 和 max_df
# unionByName 要求两个 DataFrame 具有相同的列名和类型
result_df = min_df.unionByName(max_df)
print("\n最终行式聚合结果:")
result_df.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# | min| 2| 5| 18| 29|
# | max| 8| 123| 26| 187|
# +--------+-----+----+----+-----+这里的operator.add用于连接两个列表,它与直接使用+的效果相同,例如 [F.lit('min').alias('agg_type')] + [F.col(f'min_{c}').alias(c) for c in df.columns]。
这种方法提供了一个灵活且强大的模式,用于在PySpark中实现复杂的行式聚合输出。
通过掌握这种技巧,开发者可以更灵活地控制PySpark聚合结果的呈现方式,以满足各种数据分析和报告的需求。
以上就是PySpark DataFrame多列多函数聚合与行式结果呈现的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号