
本文探讨了在pandas dataframe中高效查找满足特定条件的最新历史索引的方法。针对传统`df.apply`方法的性能瓶颈,文章详细介绍了基于python内置`bisect`模块的优化方案。通过对比多种实现,重点展示了`bisect`在处理大规模数据集时显著的性能优势,并提供了详细的代码示例与解释,旨在帮助读者提升pandas数据处理效率。
在数据分析和处理中,我们经常会遇到需要基于当前行数据,从历史数据中查找满足特定条件的记录,并获取其相关信息(例如索引或日期)的场景。一个典型的例子是:给定一个DataFrame,其中包含“lower”和“upper”两列以及一个日期索引,我们需要为每一行找到其之前所有行中,“lower”值大于等于当前行“upper”值的最新记录的日期。
考虑以下Pandas DataFrame示例,其中包含lower和upper两列,并以日期作为索引:
import pandas as pd
import numpy as np
# 示例 DataFrame
data = {'lower': [7, 1, 6, 1, 1, 1, 1, 11, 1, 1],
'upper': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]}
df = pd.DataFrame(data=data)
df['DATE'] = pd.date_range('2020-01-01', periods=len(data['lower']))
df.set_index('DATE', inplace=True)
print("原始DataFrame:")
print(df)输出:
原始DataFrame:
lower upper
DATE
2020-01-01 7 2
2020-01-02 1 3
2020-01-03 6 4
2020-01-04 1 5
2020-01-05 1 6
2020-01-06 1 7
2020-01-07 1 8
2020-01-08 11 9
2020-01-09 1 10
2020-01-10 1 11我们的目标是添加一个新列prev,其中包含满足条件previous_row['lower'] >= current_row['upper']的最新历史记录的DATE。
一种直观但效率低下的实现方式是使用df.apply()结合df.loc进行行迭代:
def get_most_recent_index_baseline(row, dataframe):
# 查找当前行之前的行
# 注意:row.name - pd.Timedelta(minutes=1) 假设索引是分钟频率,确保不包含当前行
previous_indices = dataframe.loc[:row.name - pd.Timedelta(minutes=1)]
# 在之前的行中筛选满足条件的记录,并获取其最大(即最新)索引
recent_index = previous_indices[previous_indices['lower'] >= row['upper']].index.max()
return recent_index
# 应用函数
# df['prev'] = df.apply(lambda row: get_most_recent_index_baseline(row, df), axis=1)
# print("\n使用df.apply()的结果:")
# print(df)这种方法的问题在于,df.apply(axis=1)本质上是逐行迭代,并且在每次迭代中,previous_indices = dataframe.loc[:row.name - pd.Timedelta(minutes=1)]会创建一个DataFrame的切片副本,然后进行条件筛选和max()操作。对于大型DataFrame,这种重复的切片和操作会导致极高的计算开销,性能非常差。
为了量化性能差异,我们通常会使用一个更大的数据集进行基准测试。以下是一个生成测试数据的函数和基准测试的设置:
def get_sample_df(rows=100_000):
# Sample DataFrame
data = {'lower': np.random.default_rng(seed=1).uniform(1,100,rows),
'upper': np.random.default_rng(seed=2).uniform(1,100,rows)}
df = pd.DataFrame(data=data)
df = df.astype(int)
df['DATE'] = pd.date_range('2020-01-01', periods=len(data['lower']), freq="min")
df.set_index('DATE', inplace=True)
return df
# get_baseline 函数封装了上述df.apply()的逻辑
def get_baseline():
df = get_sample_df()
def get_most_recent_index(row):
previous_indices = df.loc[:row.name - pd.Timedelta(minutes=1)]
recent_index = previous_indices[previous_indices['lower'] >= row['upper']].index.max()
return recent_index
df['prev'] = df.apply(get_most_recent_index, axis=1)
return df
# 基准测试结果 (针对100,000行数据)
# baseline: 1min 35s ± 5.15 s per loop (mean ± std. dev. of 2 runs, 2 loops each)从基准测试结果可以看出,对于10万行数据,df.apply()方法需要大约1分35秒,这在实际应用中是不可接受的。因此,我们需要寻找更高效的算法来解决这个问题。
由于问题涉及到依赖于过去状态的查找,完全的Pandas矢量化操作可能难以直接实现。然而,我们可以通过结合Python的bisect模块和自定义迭代逻辑来大幅提升性能。bisect模块实现了二分查找算法,可以在有序序列中高效地查找元素插入点。
核心思想是:
以下是使用bisect实现的优化方案:
from bisect import bisect_left
def get_bisect():
df = get_sample_df() # 使用与基准测试相同的样本数据
def get_prev_bs(lower_series, upper_series, date_index):
# 获取所有唯一的lower值并排序,用于二分查找
uniq_lower = sorted(set(lower_series))
# 存储每个lower值最近出现的日期
last_seen = {}
# 迭代DataFrame的每一行
for l, u, d in zip(lower_series, upper_series, date_index):
# 使用bisect_left找到在uniq_lower中第一个大于等于u的元素的索引
# 这意味着uniq_lower[idx:]包含了所有可能满足条件 lower >= u 的值
idx = bisect_left(uniq_lower, u)
max_date = None
# 遍历所有满足条件的lower值
for lv in uniq_lower[idx:]:
# 如果这个lower值之前出现过
if lv in last_seen:
# 更新max_date为最近的日期
if max_date is None:
max_date = last_seen[lv]
elif last_seen[lv] > max_date:
max_date = last_seen[lv]
# 返回当前行的结果
yield max_date
# 更新last_seen字典:当前lower值l的最新日期是d
last_seen[l] = d
df["prev"] = list(get_prev_bs(df["lower"], df["upper"], df.index))
return df
# 基准测试结果 (针对100,000行数据)
# bisect: 1.76 s ± 82.5 ms per loop (mean ± std. dev. of 2 runs, 2 loops each)
# 打印使用bisect方法的示例结果 (使用小规模df)
def get_bisect_example():
data = {'lower': [7, 1, 6, 1, 1, 1, 1, 11, 1, 1],
'upper': [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]}
df = pd.DataFrame(data=data)
df['DATE'] = pd.date_range('2020-01-01', periods=len(data['lower']))
df.set_index('DATE', inplace=True)
def get_prev_bs_example(lower_series, upper_series, date_index):
uniq_lower = sorted(set(lower_series))
last_seen = {}
for l, u, d in zip(lower_series, upper_series, date_index):
idx = bisect_left(uniq_lower, u)
max_date = None
for lv in uniq_lower[idx:]:
if lv in last_seen:
if max_date is None:
max_date = last_seen[lv]
elif last_seen[lv] > max_date:
max_date = last_seen[lv]
yield max_date
last_seen[l] = d
df["prev"] = list(get_prev_bs_example(df["lower"], df["upper"], df.index))
return df
print("\n使用bisect()方法的结果:")
print(get_bisect_example())输出:
使用bisect()方法的结果:
lower upper prev
DATE
2020-01-01 7 2 NaT
2020-01-02 1 3 2020-01-01
2020-01-03 6 4 2020-01-01
2020-01-04 1 5 2020-01-03
2020-01-05 1 6 2020-01-03
2020-01-06 1 7 2020-01-01
2020-01-07 1 8 NaT
2020-01-08 11 9 NaT
2020-01-09 1 10 2020-01-08
2020-01-10 1 11 2020-01-08从基准测试结果来看,bisect方法将处理10万行数据的时间从1分35秒大幅缩短至约1.76秒,性能提升了近50倍,这在处理大规模数据集时是决定性的优势。
在解决这类问题时,通常会有多种尝试,但并非所有都适用于所有场景:
pyjanitor是一个提供整洁API的Pandas扩展库,其中包含conditional_join等高级功能,可以用于执行条件连接操作。理论上,它可能提供一种矢量化的解决方案。
# def get_pyjanitor():
# df = get_sample_df()
# df.reset_index(inplace=True)
# left_df = df.assign(index_prev=df.index)
# right_df = df.assign(index_next=df.index)
# out=(left_df
# .conditional_join(
# right_df,
# ('lower','upper','>='),
# ('index_prev','index_next','<'),
# df_columns='index_prev',
# right_columns=['index_next','lower','upper'])
# )
# # 后续处理逻辑以找到最近的匹配项
# # ...
# return df
# 基准测试结果:
# Unable to allocate 37.2 GiB for an array with shape (4994299505,) and data type int64尽管pyjanitor提供了一种结构化的方法,但在实际测试中,对于中等规模以上的数据集(例如10万行),conditional_join操作可能会导致巨大的内存分配错误(如上述Unable to allocate 37.2 GiB),使其在内存受限的环境下不可用。这是因为条件连接可能产生一个非常大的中间结果集。
另一种尝试是使用enumerate和列表操作进行迭代。这种方法与df.apply类似,也是逐行处理,但可能通过直接操作Python列表来避免Pandas内部的一些开销。
# def get_enumerate(): # df = get_sample_df() # df.reset_index(inplace=True) # date_list=df["DATE"].values.tolist() # lower_list=df["lower"].values.tolist() # upper_list=df["upper"].values.tolist() # new_list=[] # for i,(x,y) in enumerate(zip(lower_list,upper_list)): # if i==0: # new_list.append(None) # else: # if (any(j >= y for j in lower_list[0:i])): # 检查是否存在满足条件的项 # for ll,dl in zip(reversed(lower_list[0:i]),reversed(date_list[0:i])): # 从后往前查找最近的 # if ll>=y: # new_list.append(dl) # break # else: # new_list.append(None) # df['prev']=new_list # df['prev']=pd.to_datetime(df['prev']) # return df # 基准测试结果: # enumerate: 1min 13s ± 2.17 s per loop (mean ± std. dev. of 2 runs, 2 loops each)
这种方法虽然比df.apply()略快,但其时间复杂度仍然较高,因为它在每次迭代中都可能需要遍历之前的所有元素(reversed(lower_list[0:i])),导致整体性能依然不佳,对于大规模数据仍无法满足要求。
本文详细探讨了在Pandas DataFrame中查找满足特定条件的最新历史索引的问题,并对比了多种实现方法的性能。
注意事项:
总之,在Pandas数据处理中遇到涉及历史状态依赖的复杂条件查找时,深入理解问题特性,并灵活运用Python的内置高效算法(如bisect),是实现高性能数据处理的关键。
以上就是Pandas高效数据处理:利用bisect优化条件查找最新匹配索引的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号