0

0

Redshift数据库中从DataFrame高效批量插入数据的策略与实践

心靈之曲

心靈之曲

发布时间:2025-11-25 14:33:00

|

170人浏览过

|

来源于php中文网

原创

Redshift数据库中从DataFrame高效批量插入数据的策略与实践

本教程旨在解决从python dataframe向amazon redshift数据库高效批量插入数据的挑战。文章将深入探讨传统逐行或小批量插入方法的性能瓶颈,并提出两种优化策略:利用`psycopg2.extras.execute_values`实现多行sql插入,以及更推荐的、通过amazon s3服务结合redshift的`copy`命令进行大规模数据加载。通过具体代码示例和最佳实践,帮助开发者显著提升数据导入效率。

在处理大规模数据集成任务时,将Python DataFrame中的数据导入Amazon Redshift这类分析型数据库,常常面临性能瓶颈。传统的逐行插入或小批量executemany方法,对于数十万甚至数百万条记录的数据集来说,效率低下,可能导致数天的时间消耗甚至连接超时。Redshift作为一款大规模并行处理(MPP)的列式存储数据库,其设计哲学是优化大规模数据的批量加载,而非频繁的单行或小批量操作。

理解Redshift的批量加载机制

Redshift的性能优势在于其分布式架构和列式存储。每次执行SQL插入操作,即使是executemany,如果底层仍然是发送多条独立的INSERT语句,或者每次只插入少量数据,都会引入大量的网络往返开销、事务开销以及数据库内部的元数据处理开销。这与Redshift期望的“一次性加载大量数据”的工作模式相悖。官方文档明确指出,当无法使用COPY命令时,应尽可能使用多行插入(multi-row insert),因为单行或少量行的数据添加会导致数据压缩效率低下。

优化策略一:使用多行SQL插入 (psycopg2.extras.execute_values)

对于无法直接使用COPY命令的场景,或数据量相对较小(但仍远超单行)时,多行SQL插入是比逐行或executemany更优的选择。psycopg2库提供了psycopg2.extras.execute_values函数,它可以将多行数据构建成一个单一的SQL INSERT INTO ... VALUES (...), (...), ... 语句,从而显著减少与数据库的交互次数。

示例代码:

import psycopg2
import pandas as pd
from psycopg2 import extras
import io

# 假设 df 是您的 DataFrame
# df = pd.DataFrame(...)

# 示例数据 (与原问题保持一致的结构)
data = [
    {'case_id': 69370, 'column_name': 'subject', 'split_text': 'working', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69370, 'column_name': 'subject', 'split_text': 'scenes', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69370, 'column_name': 'subject', 'split_text': 'intended', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'redirected', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'ge', 'split_text_cnt': 2, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'sensor', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'flush', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'motion', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'led', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'fixture', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'contact', 'split_text_cnt': 1, 'load_ts': '2023-12-15'}
]
df = pd.DataFrame(data)

# Redshift 连接参数
conn_params = {
    'host': 'redshift-####-dev.00000.us-east-1.redshift.amazonaws.com',
    'database': '*****',
    'user': '****',
    'password': '*****',
    'port': '5439'
}

table_name = 'odey.sfc_ca_sit_di' # 目标表名
columns = ['case_id', 'column_name', 'split_text', 'split_text_cnt', 'load_ts'] # 目标表的列名

try:
    conn = psycopg2.connect(**conn_params)
    print("成功连接到 Redshift Dev")
    cur = conn.cursor()

    # 将DataFrame转换为元组列表,顺序与目标列一致
    values = [tuple(row) for row in df[columns].values]

    # Redshift SQL 命令的最大大小为16MB,因此需要分批插入
    batch_size = 10000 # 根据实际情况调整批次大小,确保SQL语句不超过16MB

    for i in range(0, len(values), batch_size):
        batch = values[i:i + batch_size]
        # 使用 execute_values 构建多行插入语句
        extras.execute_values(
            cur,
            f"INSERT INTO {table_name} ({','.join(columns)}) VALUES %s",
            batch
        )
        conn.commit() # 每批次提交一次
        print(f"已插入 {min(i + batch_size, len(values))} 条记录。")

    print("数据批量插入完成。")

except Exception as e:
    print(f"插入数据时发生错误: {e}")
    if conn:
        conn.rollback() # 发生错误时回滚
finally:
    if cur:
        cur.close()
    if conn:
        conn.close()
    print("数据库连接已关闭。")

注意事项:

  • 批次大小 (batch_size): Redshift SQL 命令的最大大小为16MB。因此,即使使用execute_values,也需要根据每行数据的大小和总行数进行分批处理,以避免SQL语句过大。通常,数千到数万行的批次是合理的起点,具体数值需要根据数据宽度进行测试。
  • 事务管理: 建议每批次提交一次事务(conn.commit()),以平衡性能和数据一致性。过大的事务可能导致长时间锁定和内存问题,而过小的事务则会增加提交开销。

优化策略二:通过Amazon S3和COPY命令进行大规模数据加载(推荐)

对于大规模数据集(如数十万到数百万条记录,甚至TB级别),Redshift的COPY命令是最高效、最推荐的数据加载方式。COPY命令允许Redshift直接从Amazon S3存储桶中并行加载数据,利用其分布式架构的全部能力。

网趣网上购物系统HTML静态版
网趣网上购物系统HTML静态版

网趣购物系统静态版支持网站一键静态生成,采用动态进度条模式生成静态,生成过程更加清晰明确,商品管理上增加淘宝数据包导入功能,与淘宝数据同步更新!采用领先的AJAX+XML相融技术,速度更快更高效!系统进行了大量的实用性更新,如优化核心算法、增加商品图片批量上传、谷歌地图浏览插入等,静态版独特的生成算法技术使静态生成过程可随意掌控,从而可以大大减轻服务器的负担,结合多种强大的SEO优化方式于一体,使

下载

核心步骤:

  1. 将DataFrame保存到S3: 将DataFrame数据转换为文件格式(如CSV、Parquet等),并上传到Amazon S3存储桶。
  2. 执行Redshift COPY命令: 在Redshift中执行COPY命令,指示其从S3存储桶加载数据。

示例代码:

首先,确保您已安装boto3(AWS SDK for Python)和pandas。

import psycopg2
import pandas as pd
import boto3
import io

# 假设 df 是您的 DataFrame
# df = pd.DataFrame(...)

# 示例数据 (与原问题保持一致的结构)
data = [
    {'case_id': 69370, 'column_name': 'subject', 'split_text': 'working', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69370, 'column_name': 'subject', 'split_text': 'scenes', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69370, 'column_name': 'subject', 'split_text': 'intended', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'redirected', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'ge', 'split_text_cnt': 2, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'sensor', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'flush', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'motion', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'led', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'fixture', 'split_text_cnt': 1, 'load_ts': '2023-12-15'},
    {'case_id': 69371, 'column_name': 'subject', 'split_text': 'contact', 'split_text_cnt': 1, 'load_ts': '2023-12-15'}
]
df = pd.DataFrame(data)

# Redshift 连接参数
conn_params = {
    'host': 'redshift-####-dev.00000.us-east-1.redshift.amazonaws.com',
    'database': '*****',
    'user': '****',
    'password': '*****',
    'port': '5439'
}

# S3 配置
s3_bucket = 'your-s3-bucket-name' # 替换为您的S3存储桶名称
s3_key = 'data/temp_redshift_load.csv' # S3文件路径
iam_role_arn = 'arn:aws:iam::123456789012:role/YourRedshiftIAMRole' # 替换为您的Redshift IAM角色ARN

table_name = 'odey.sfc_ca_sit_di' # 目标表名

try:
    # 1. 将DataFrame保存到S3
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False, header=False) # Redshift COPY通常不需要header

    s3_client = boto3.client('s3', region_name='us-east-1') # 替换为您的AWS区域
    s3_client.put_object(Bucket=s3_bucket, Key=s3_key, Body=csv_buffer.getvalue())
    print(f"数据已成功上传到 S3: s3://{s3_bucket}/{s3_key}")

    # 2. 连接Redshift并执行COPY命令
    conn = psycopg2.connect(**conn_params)
    print("成功连接到 Redshift Dev")
    cur = conn.cursor()

    # 构建COPY命令
    # 注意:这里的列顺序必须与CSV文件中的数据顺序一致
    copy_sql = f"""
    COPY {table_name} ({','.join(df.columns)})
    FROM 's3://{s3_bucket}/{s3_key}'
    IAM_ROLE '{iam_role_arn}'
    CSV
    DELIMITER ','
    IGNOREHEADER 0; -- 如果CSV没有头部,设置为0
    """
    # 如果CSV有头部,设置为 IGNOREHEADER 1

    cur.execute(copy_sql)
    conn.commit()
    print("数据已通过 Redshift COPY 命令成功加载。")

except Exception as e:
    print(f"数据加载过程中发生错误: {e}")
    if conn:
        conn.rollback()
finally:
    if cur:
        cur.close()
    if conn:
        conn.close()
    print("数据库连接已关闭。")
    # 可选:清理S3上的临时文件
    # try:
    #     s3_client.delete_object(Bucket=s3_bucket, Key=s3_key)
    #     print(f"S3临时文件 s3://{s3_bucket}/{s3_key} 已删除。")
    # except Exception as e:
    #     print(f"删除S3文件时发生错误: {e}")

关键配置与最佳实践:

  • IAM角色: Redshift集群需要一个具有访问S3存储桶权限的IAM角色。该角色应具有s3:GetObject和s3:ListBucket权限。将IAM角色的ARN提供给COPY命令。
  • 文件格式:
    • CSV: 简单易用,但对于复杂数据类型可能需要额外处理。
    • Parquet/ORC: 推荐用于大规模数据集。它们是列式存储格式,具有更好的压缩和编码效率,Redshift可以直接利用这些格式的优势进行更高效的加载。使用pyarrow库可以将DataFrame保存为Parquet格式。
  • 压缩: 强烈建议对S3上的数据文件进行压缩(如GZIP、SNAPPY)。Redshift的COPY命令支持多种压缩格式,可以显著减少数据传输量和加载时间。
  • 文件分片: 对于非常大的数据集,将数据分成多个小文件(例如,每个文件大小在1MB到1GB之间,取决于集群大小)并上传到S3,可以使Redshift的多个切片(slice)并行加载数据,进一步提高效率。
  • 错误处理: COPY命令提供了强大的错误处理机制,例如MAXERRORS、NOLOAD、DATEFORMAT、TIMEFORMAT等选项,可以帮助您在加载过程中处理数据不匹配或格式错误。

总结

从Python DataFrame向Amazon Redshift高效批量插入数据,应避免传统的逐行或小批量executemany方法。对于中等规模的数据,可以采用psycopg2.extras.execute_values构建多行SQL插入语句,并注意分批处理以遵守SQL命令大小限制。然而,对于大规模数据集,最推荐且最高效的方法是利用Amazon S3作为中间存储,结合Redshift的COPY命令进行数据加载。通过选择合适的S3文件格式、压缩以及正确的IAM配置,可以充分发挥Redshift的并行处理能力,实现极速的数据导入。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

755

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

636

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

759

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

618

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1262

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

547

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

577

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

707

2023.08.11

Golang gRPC 服务开发与Protobuf实战
Golang gRPC 服务开发与Protobuf实战

本专题系统讲解 Golang 在 gRPC 服务开发中的完整实践,涵盖 Protobuf 定义与代码生成、gRPC 服务端与客户端实现、流式 RPC(Unary/Server/Client/Bidirectional)、错误处理、拦截器、中间件以及与 HTTP/REST 的对接方案。通过实际案例,帮助学习者掌握 使用 Go 构建高性能、强类型、可扩展的 RPC 服务体系,适用于微服务与内部系统通信场景。

8

2026.01.15

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 0.9万人学习

Django 教程
Django 教程

共28课时 | 3.1万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号