
在数据分析中,我们经常需要对dataframe中的每一行,根据其特定属性(如商店id)和时间戳,回溯查询满足特定条件的历史数据并进行聚合计算。例如,对于每一笔交易,我们可能需要计算在过去15天内同一商店某种特定商品的出现次数。
传统的实现方式通常涉及使用循环(如iterrows())遍历DataFrame的每一行,然后在循环内部对DataFrame进行过滤和计算。这种方法虽然直观,但对于大型数据集而言,其性能瓶颈非常明显,因为它涉及大量的重复数据筛选操作,且Python循环的效率远低于底层优化的Pandas操作。
考虑以下一个典型的低效实现示例:
import pandas as pd
import numpy as np
from tqdm import tqdm
# 示例数据
data = pd.DataFrame({
'shop': [1, 2, 1, 2, 1, 2, 1, 2, 1, 2] * 3,
'execution_date': pd.to_datetime(pd.date_range(start='2023-11-01', periods=30, freq='D')),
'item': np.random.choice(['stuff', 'other', 'another'], size=30)
})
data.loc[data.index % 3 == 0, 'item'] = 'stuff'
data.loc[data.index % 7 == 0, 'item'] = 'stuff'
cum_items = []
data.execution_date = pd.to_datetime(data.execution_date, errors="coerce")
subset = data[["shop", "execution_date", "item"]]
# 预处理:将需要计数的条件转换为数值(1或0)
# 假设我们要计算 'item' 列中 'stuff' 的出现次数
subset['is_stuff'] = (subset['item'] == 'stuff').astype(int)
for i, row in tqdm(subset.iterrows(), total=len(subset)):
# 过滤条件:同一商店,且日期严格早于当前日期减去15天
fdf = (subset
.loc[(subset.shop == row["shop"])]
.loc[subset.execution_date < (row["execution_date"] - pd.Timedelta(15, unit='d'))]
)
if len(fdf) == 0:
cum_items.append(np.nan)
else:
cum_items.append(fdf['is_stuff'].sum()) # 计算 'stuff' 的出现次数
data["cum_items_iter"] = cum_items
print("迭代计算结果 (部分):")
print(data[['shop', 'execution_date', 'item', 'cum_items_iter']].head(10))上述代码通过逐行遍历,为每行构建一个子DataFrame并执行过滤和求和操作。对于包含数万甚至数百万行的数据集,这种方法将导致极长的运行时间。
为了显著提升性能,我们需要利用Pandas的向量化操作。groupby_rolling是处理此类分组-时间窗口聚合问题的强大工具。它允许我们对DataFrame进行分组,并在每个组内定义一个滑动时间窗口进行计算,从而避免显式循环。
核心思想:
下面是使用groupby_rolling实现上述需求的向量化代码:
import pandas as pd
import numpy as np
# 示例数据(与上文相同)
data = pd.DataFrame({
'shop': [1, 2, 1, 2, 1, 2, 1, 2, 1, 2] * 3,
'execution_date': pd.to_datetime(pd.date_range(start='2023-11-01', periods=30, freq='D')),
'item': np.random.choice(['stuff', 'other', 'another'], size=30)
})
data.loc[data.index % 3 == 0, 'item'] = 'stuff'
data.loc[data.index % 7 == 0, 'item'] = 'stuff'
# 1. 确保日期列为datetime类型
data.execution_date = pd.to_datetime(data.execution_date, errors="coerce")
# 2. 预处理:将需要计数的条件转换为数值(1或0)
# 假设我们要计算 'item' 列中 'stuff' 的出现次数
data['is_stuff'] = (data['item'] == 'stuff').astype(int)
# 3. 排序:按商店和日期排序,这对于滚动操作至关重要
# 注意:groupby_rolling 可能会打乱原始顺序,因此在计算前排序并保留原始索引是好习惯
subset_sorted = data.sort_values(['shop', 'execution_date']).copy()
# 4. 使用 groupby_rolling 进行向量化计算
# '15D' 定义了一个15天的窗口,包括当前日期并向前追溯15天
# on='execution_date' 指定了滚动基于的日期列
data['cum_items_vec'] = (subset_sorted.groupby('shop')
.rolling('15D', on='execution_date')['is_stuff']
.sum()
.reset_index(level=0, drop=True) # 移除groupby引入的shop级别索引
.reindex(data.index) # 将结果重新对齐到原始DataFrame的索引
)
# 由于滚动窗口默认包含当前点,如果需要排除当前点,可以进一步处理
# 例如,如果'is_stuff'在当前日期为1,则减去它
# data['cum_items_vec'] = data['cum_items_vec'] - data['is_stuff']
print("\n向量化计算结果 (部分):")
print(data[['shop', 'execution_date', 'item', 'is_stuff', 'cum_items_vec']].head(10))
# 比较迭代和向量化结果(如果窗口定义一致)
# 注意:本例中,迭代方法和向量化方法对时间窗口的定义略有不同
# 迭代方法:D' < D - 15天 (严格早于15天前)
# 向量化方法:[D - 15天, D] (过去15天,包含当前日期)
# 因此,结果通常不会完全相同,但向量化方法解决了“过去N天”的常见需求关于时间窗口的说明: 原问题中迭代方法的条件是 subset.execution_date < (row["execution_date"] - pd.Timedelta(15, unit='d')),这意味着它查找严格早于当前日期15天前的所有记录。而rolling('15D', on='execution_date')所定义的窗口是[当前日期 - 15天, 当前日期],即包含当前日期在内的过去15天。虽然这两种窗口定义略有不同,但groupby_rolling提供的“过去N天”窗口计算是时间序列分析中更常见且高效的向量化模式。如果严格需要原问题中的窗口定义,可能需要更复杂的rolling配置或结合其他操作。
通过将低效的iterrows()循环替换为Pandas的groupby_rolling函数,我们能够实现对DataFrame中条件滚动累加计算的显著性能提升。这种向量化方法不仅代码更简洁,而且利用了Pandas底层C语言优化,极大地加快了大数据集的处理速度。掌握groupby_rolling是进行高效时间序列数据分析的关键技能之一。
以上就是Pandas中条件滚动累加的向量化实现的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号