
在web开发中,尤其是使用flask这样的微服务框架时,一个基本原则是web服务器应专注于处理http请求并快速响应,而不应执行耗时或阻塞性的后台任务,如数据抓取(scraping)或文件i/o操作。将这类任务直接嵌入到flask应用的主线程中,会导致请求响应延迟,甚至阻塞其他用户的请求。因此,对于“每10分钟自动刷新csv文件”的需求,最佳实践是将数据更新逻辑与flask应用本身解耦,让其在独立的进程中运行。flask应用只需负责读取已更新的csv文件。
对于部署在Linux/Unix环境下的应用,Cron Job是最简单直接的定时任务解决方案。它允许用户在操作系统层面配置定时执行脚本或命令。
工作原理:
示例(update_csv.py):
# update_csv.py
import pandas as pd
import datetime
import os
def scrape_and_update_csv():
# 模拟数据抓取和处理
print(f"[{datetime.datetime.now()}] 开始抓取数据并更新CSV...")
data = {
'game': ['Game A', 'Game B', 'Game C'],
'stake': [1.5, 2.0, 1.8],
'timestamp': [datetime.datetime.now()] * 3
}
df = pd.DataFrame(data)
# 定义CSV文件路径
# 注意:这里的路径应是绝对路径,以便cron正确找到
csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv')
# 将数据保存到CSV
df.to_csv(csv_file_path, index=False)
print(f"[{datetime.datetime.now()}] CSV文件已更新:{csv_file_path}")
if __name__ == "__main__":
scrape_and_update_csv()配置Cron Job:
打开终端,输入 crontab -e。
在打开的文件末尾添加一行(确保Python环境和脚本路径正确):
*/10 * * * * /usr/bin/python3 /path/to/your/update_csv.py >> /path/to/your/cron.log 2>&1
优点: 简单、可靠,系统资源占用低。 缺点: 平台依赖(主要用于类Unix系统),任务管理不够灵活(例如,难以从Python代码中动态调度或取消任务)。
APScheduler(Advanced Python Scheduler)是一个轻量级的Python库,允许你在Python应用内部(或独立脚本中)安排任务。它支持多种调度器类型,如BlockingScheduler(用于独立脚本)和BackgroundScheduler(用于在应用内部以非阻塞方式运行)。
工作原理:
示例(scheduler_app.py):
# scheduler_app.py
from apscheduler.schedulers.blocking import BlockingScheduler
import pandas as pd
import datetime
import os
import logging
# 配置日志,方便调试
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def scrape_and_update_csv():
logging.info("开始抓取数据并更新CSV...")
try:
# 模拟数据抓取和处理
data = {
'game': [f'Game {i}' for i in range(1, 4)],
'stake': [1.5 + i * 0.1 for i in range(3)],
'timestamp': [datetime.datetime.now()] * 3
}
df = pd.DataFrame(data)
# 定义CSV文件路径
# 同样建议使用绝对路径
csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv')
# 将数据保存到CSV
df.to_csv(csv_file_path, index=False)
logging.info(f"CSV文件已更新:{csv_file_path}")
except Exception as e:
logging.error(f"更新CSV文件失败: {e}")
if __name__ == '__main__':
scheduler = BlockingScheduler()
# 每10分钟执行一次 scrape_and_update_csv 函数
scheduler.add_job(scrape_and_update_csv, 'interval', minutes=10)
logging.info("APScheduler已启动,等待任务执行...")
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
logging.info("APScheduler已停止。")运行方式: 将scheduler_app.py作为一个独立的Python脚本运行:python3 scheduler_app.py。 Flask应用和这个调度器脚本将作为两个独立的进程运行。
优点: 跨平台,纯Python实现,任务管理更灵活。 缺点: 仍需独立进程运行,不适合分布式任务。
对于更复杂、需要分布式处理、或者任务执行时间可能较长的场景,Celery是一个强大的选择。它是一个异步任务队列/基于分布式消息传递的作业队列,可以处理大量操作。
工作原理:
示例概述:
安装: pip install celery redis (如果使用Redis作为Broker)。
创建Celery应用(celery_app.py):
# celery_app.py
from celery import Celery
import pandas as pd
import datetime
import os
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
celery_app = Celery(
'csv_updater',
broker='redis://localhost:6379/0', # 替换为你的Redis地址
backend='redis://localhost:6379/0'
)
@celery_app.task
def scrape_and_update_csv_task():
logging.info("Celery任务:开始抓取数据并更新CSV...")
try:
data = {
'game': [f'Game {i}' for i in range(1, 4)],
'stake': [1.5 + i * 0.1 for i in range(3)],
'timestamp': [datetime.datetime.now()] * 3
}
df = pd.DataFrame(data)
csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv')
df.to_csv(csv_file_path, index=False)
logging.info(f"Celery任务:CSV文件已更新:{csv_file_path}")
except Exception as e:
logging.error(f"Celery任务:更新CSV文件失败: {e}")启动Celery Worker: 在终端中运行:celery -A celery_app worker --loglevel=info
使用Celery Beat进行定时调度: 创建celeryconfig.py文件:
# celeryconfig.py
from datetime import timedelta
CELERY_BEAT_SCHEDULE = {
'update-csv-every-10-minutes': {
'task': 'celery_app.scrape_and_update_csv_task',
'schedule': timedelta(minutes=10),
'args': (),
},
}
CELERY_TIMEZONE = 'Asia/Shanghai' # 根据需要设置时区启动Celery Beat:celery -A celery_app beat -s celerybeat-schedule --loglevel=info
优点: 强大、可扩展、支持分布式、任务重试、结果存储等高级功能,适用于生产环境复杂任务。 缺点: 配置相对复杂,引入了额外的组件(Broker、Worker、Beat)。
当一个后台进程定时更新CSV文件,而Flask应用同时尝试读取该文件时,可能会出现数据不一致或文件锁定问题。
原子性写入: 避免直接覆盖正在读取的文件。最佳实践是:
# 修改 scrape_and_update_csv 函数
def scrape_and_update_csv():
# ... 数据抓取逻辑 ...
csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv')
temp_file_path = csv_file_path + '.tmp'
df.to_csv(temp_file_path, index=False) # 写入临时文件
os.replace(temp_file_path, csv_file_path) # 原子性替换
logging.info(f"CSV文件已更新:{csv_file_path}")数据库替代方案: 如果数据量较大或对并发访问有更高要求,将数据存储在数据库(如SQLite,因为Flask应用已配置SQLAlchemy)而不是CSV文件是更健壮的选择。
无论采用哪种后台更新策略,Flask应用在处理用户请求时,只需从固定的CSV文件路径读取数据即可。
# from .views import views (假设在views.py中)
from flask import Blueprint, render_template
import pandas as pd
import os
views = Blueprint('views', __name__)
@views.route('/')
def home():
csv_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.csv')
# 确保文件存在,并处理可能的文件不存在或读取错误
if os.path.exists(csv_file_path):
try:
df = pd.read_csv(csv_file_path)
# 假设你的CSV有 'game' 和 'stake' 列
games_data = df.to_dict(orient='records')
return render_template("home.html", games=games_data)
except Exception as e:
print(f"读取CSV文件失败: {e}")
return render_template("home.html", games=[], error="无法加载数据")
else:
return render_template("home.html", games=[], error="数据文件不存在,请稍后再试")
请注意,os.path.dirname(os.path.abspath(__file__)) 仅在当前文件直接运行时有效。在Flask应用中,你需要确保data.csv的路径是相对于你的项目根目录或Flask应用实例可访问的路径。
在Flask等Web应用中实现定时数据刷新,核心原则是将耗时操作从Web服务器的请求-响应循环中分离出来。本文介绍了三种主流策略:
无论选择哪种方案,都应注意数据一致性和文件访问的原子性。对于更严谨的数据管理,将数据存储在数据库中是比CSV文件更推荐的方案,因为它提供了更完善的并发控制和数据完整性保障。通过合理选择和实施这些策略,可以确保Flask应用能够稳定、高效地提供最新数据,同时保持其响应性。
以上就是Flask应用中定时刷新CSV数据的高效策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号