
在数据分析和处理过程中,我们经常会遇到包含重复记录的数据集。虽然有时需要完全删除重复项,但在某些场景下,我们可能希望保留每组重复项中的特定数量,例如最新的n条记录。本文将深入探讨如何使用python的pandas库和pyspark框架,高效地实现这一目标。
假设我们有一个包含用户活动的数据帧,其中first_name、last_name和sex组合可能存在重复,但id和country是唯一的。我们的目标是针对每个重复的用户组合(由first_name、last_name和sex定义),只保留其最新的3条记录。这里的“最新”通常根据某个时间戳或递增的ID列来定义。
原始数据帧示例:
| id | first_name | last_name | sex | country |
|---|---|---|---|---|
| 01 | John | Doe | Male | USA |
| 02 | John | Doe | Male | Canada |
| 03 | John | Doe | Male | Mexico |
| 04 | Mark | Kay | Male | Italy |
| 05 | John | Doe | Male | Spain |
| 06 | Mark | Kay | Male | France |
| 07 | John | Doe | Male | Peru |
| 08 | Mark | Kay | Male | India |
| 09 | Mark | Kay | Male | Laos |
| 10 | John | Doe | Male | Benin |
期望结果(保留每组重复项的最后3条,基于id排序):
| id | first_name | last_name | sex | country |
|---|---|---|---|---|
| 05 | John | Doe | Male | Spain |
| 06 | Mark | Kay | Male | France |
| 07 | John | Doe | Male | Peru |
| 08 | Mark | Kay | Male | India |
| 09 | Mark | Kay | Male | Laos |
| 10 | John | Doe | Male | Benin |
对于中小型数据集,Pandas提供了一个非常简洁且高效的方法来解决这个问题,即结合groupby()和tail()。
import pandas as pd
# 示例数据帧
data = {
'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'first_name': ['John', 'John', 'John', 'Mark', 'John', 'Mark', 'John', 'Mark', 'Mark', 'John'],
'last_name': ['Doe', 'Doe', 'Doe', 'Kay', 'Doe', 'Kay', 'Doe', 'Kay', 'Kay', 'Doe'],
'sex': ['Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male'],
'country': ['USA', 'Canada', 'Mexico', 'Italy', 'Spain', 'France', 'Peru', 'India', 'Laos', 'Benin']
}
df = pd.DataFrame(data)
print("原始数据帧:")
print(df)
# 步骤1: 根据 'id' 列对数据帧进行排序,确保“最新”的定义是正确的
# 默认升序,即较大的ID代表更新的记录
df_sorted = df.sort_values(by='id')
# 步骤2: 根据重复键进行分组,并对每个组保留最后3条记录
result_df = df_sorted.groupby(['first_name', 'last_name', 'sex']).tail(3)
# 步骤3: 重置索引(可选,但通常推荐,使索引连续)
result_df = result_df.reset_index(drop=True)
print("\n处理后的数据帧:")
print(result_df)对于大规模数据集,PySpark提供了分布式处理能力,其窗口函数是处理此类问题的强大工具。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 初始化SparkSession
spark = SparkSession.builder.appName("FilterDuplicatesSpark").getOrCreate()
# 示例数据
data = [
(1, 'John', 'Doe', 'Male', 'USA'),
(2, 'John', 'Doe', 'Male', 'Canada'),
(3, 'John', 'Doe', 'Male', 'Mexico'),
(4, 'Mark', 'Kay', 'Male', 'Italy'),
(5, 'John', 'Doe', 'Male', 'Spain'),
(6, 'Mark', 'Kay', 'Male', 'France'),
(7, 'John', 'Doe', 'Male', 'Peru'),
(8, 'Mark', 'Kay', 'Male', 'India'),
(9, 'Mark', 'Kay', 'Male', 'Laos'),
(10, 'John', 'Doe', 'Male', 'Benin')
]
columns = ['id', 'first_name', 'last_name', 'sex', 'country']
df_spark = spark.createDataFrame(data, columns)
print("原始Spark数据帧:")
df_spark.show()
# 步骤1: 定义窗口规范
# partitionBy: 根据哪些列来分组
# orderBy: 在每个分组内,根据哪些列进行排序。F.desc('id') 表示按id降序,以便row_number为1的是最新的记录。
window_spec = Window.partitionBy('first_name', 'last_name', 'sex').orderBy(F.desc('id'))
# 步骤2: 使用row_number()为每个分组内的记录分配行号
df_with_row_number = df_spark.withColumn('row_number', F.row_number().over(window_spec))
print("\n添加行号后的Spark数据帧:")
df_with_row_number.show()
# 步骤3: 筛选出row_number小于等于3的记录,即每个分组的最新3条
filtered_df = df_with_row_number.filter('row_number <= 3')
# 步骤4: 移除辅助列row_number
result_df_spark = filtered_df.drop('row_number')
print("\n处理后的Spark数据帧:")
result_df_spark.show()
# 停止SparkSession
spark.stop()选择建议:
本文详细介绍了在Python数据生态中处理数据帧重复记录,并保留指定数量最新记录的两种主要方法:Pandas的groupby().tail()和PySpark的窗口函数。Pandas方案适用于中小型数据集,以其简洁性著称;而PySpark方案则为大规模分布式数据处理提供了高效且可扩展的解决方案。理解这两种方法的原理、适用场景及注意事项,将有助于您在实际数据处理工作中做出明智的技术选择,从而更有效地管理和清洗数据。
以上就是数据帧重复记录筛选:高效保留指定数量的最新数据的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号