
在数据清洗和预处理过程中,我们经常会遇到包含重复记录的数据集。这些重复记录可能基于一个或多个列的组合,但我们往往需要为每个重复组保留特定数量的记录,例如,只保留每个重复组中最新的n条记录。例如,在一个包含用户活动记录的dataframe中,我们可能希望针对每个用户(由first_name, last_name, sex等列定义),只保留其最新的3条活动记录。
假设我们有如下一个DataFrame:
| id | first_name | last_name | sex | country |
|---|---|---|---|---|
| 01 | John | Doe | Male | USA |
| 02 | John | Doe | Male | Canada |
| 03 | John | Doe | Male | Mexico |
| 04 | Mark | Kay | Male | Italy |
| 05 | John | Doe | Male | Spain |
| 06 | Mark | Kay | Male | France |
| 07 | John | Doe | Male | Peru |
| 08 | Mark | Kay | Male | India |
| 09 | Mark | Kay | Male | Laos |
| 10 | John | Doe | Male | Benin |
目标是基于first_name、last_name和sex列的组合识别重复项,并为每个组合保留最新的3条记录(根据id列的降序)。
对于在内存中操作的Pandas DataFrame,groupby().tail()方法提供了一种非常简洁且高效的解决方案。
import pandas as pd
# 原始DataFrame数据
data = {
'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'first_name': ['John', 'John', 'John', 'Mark', 'John', 'Mark', 'John', 'Mark', 'Mark', 'John'],
'last_name': ['Doe', 'Doe', 'Doe', 'Kay', 'Doe', 'Kay', 'Doe', 'Kay', 'Kay', 'Doe'],
'sex': ['Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male'],
'country': ['USA', 'Canada', 'Mexico', 'Italy', 'Spain', 'France', 'Peru', 'India', 'Laos', 'Benin']
}
df = pd.DataFrame(data)
# 1. 根据'id'列对DataFrame进行排序,确保'tail(3)'能获取到最新的3条记录
# 如果'id'本身就是递增的,此步骤可确保正确性。
df_sorted = df.sort_values(by='id').copy()
# 2. 按照指定列进行分组,并为每个组保留最后3条记录
result_df = df_sorted.groupby(['first_name', 'last_name', 'sex']).tail(3)
# 3. (可选)重置索引,使索引连续
result_df = result_df.reset_index(drop=True)
# 显示结果DataFrame
print("处理后的DataFrame:")
print(result_df)输出结果:
处理后的DataFrame: id first_name last_name sex country 0 5 John Doe Male Spain 1 6 Mark Kay Male France 2 7 John Doe Male Peru 3 8 Mark Kay Male India 4 9 Mark Kay Male Laos 5 10 John Doe Male Benin
对于大规模分布式数据集,例如在Apache Spark环境中使用PySpark,groupby().tail()方法不再适用。此时,窗口函数(Window Functions)是实现此功能的标准且高效的方式。
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# 假设df是一个Spark DataFrame
# 这里为了示例,我们创建一个模拟的SparkSession和DataFrame
spark = SparkSession.builder.appName("FilterDuplicates").getOrCreate()
data = [
(1, 'John', 'Doe', 'Male', 'USA'),
(2, 'John', 'Doe', 'Male', 'Canada'),
(3, 'John', 'Doe', 'Male', 'Mexico'),
(4, 'Mark', 'Kay', 'Male', 'Italy'),
(5, 'John', 'Doe', 'Male', 'Spain'),
(6, 'Mark', 'Kay', 'Male', 'France'),
(7, 'John', 'Doe', 'Male', 'Peru'),
(8, 'Mark', 'Kay', 'Male', 'India'),
(9, 'Mark', 'Kay', 'Male', 'Laos'),
(10, 'John', 'Doe', 'Male', 'Benin')
]
columns = ['id', 'first_name', 'last_name', 'sex', 'country']
df_spark = spark.createDataFrame(data, columns)
# 定义窗口规范:按first_name, last_name, sex分组,按id降序排序
window_spec = Window.partitionBy('first_name', 'last_name', 'sex').orderBy(F.desc('id'))
# 为每个分区内的记录生成行号
df_with_row_number = df_spark.withColumn('row_number', F.row_number().over(window_spec))
# 过滤,只保留行号小于等于3的记录
filtered_df_spark = df_with_row_number.filter('row_number <= 3')
# 移除辅助列row_number
result_df_spark = filtered_df_spark.drop('row_number')
# 显示结果
print("处理后的Spark DataFrame:")
result_df_spark.show()
spark.stop()输出结果:
处理后的Spark DataFrame: +---+----------+---------+----+-------+ | id|first_name|last_name| sex|country| +---+----------+---------+----+-------+ | 5| John| Doe|Male| Spain| | 7| John| Doe|Male| Peru| | 10| John| Doe|Male| Benin| | 6| Mark| Kay|Male| France| | 8| Kay | Kay|Male| India| | 9| Mark| Kay|Male| Laos| +---+----------+---------+----+-------+
本文详细介绍了在数据处理中,如何根据特定分组筛选重复记录并保留指定数量(N)的最新数据。对于内存中的数据集,Pandas的df.sort_values().groupby().tail(N)组合方法提供了一个简洁高效的解决方案。而对于分布式大数据集,PySpark的窗口函数(Window.partitionBy().orderBy().row_number())则是实现相同逻辑的标准且高性能途径。理解这两种方法的适用场景和实现原理,能帮助开发者根据实际需求选择最合适的工具和策略,从而高效地完成数据清洗任务。
以上就是Pandas数据处理:高效筛选重复记录并保留指定数量的最新数据的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号