
本文旨在介绍如何使用Python的多进程模块multiprocessing,并发执行数据库操作,并有效控制并发线程的最大数量。我们将提供一个简洁高效的解决方案,确保数据库操作在指定的最大并发数下执行,避免资源过度消耗。同时,讨论了进程池的创建与复用,以及数据库连接管理的最佳实践。
并发执行数据库操作
在处理大量数据库操作时,并发执行可以显著提高效率。Python的multiprocessing模块提供了一种创建进程池的方法,从而实现并发执行。
使用 multiprocessing.Pool
multiprocessing.Pool允许我们创建一个工作进程池,并将任务分配给这些进程并行执行。 以下代码展示了如何使用Pool并发执行数据库操作,并限制最大并发数。
from multiprocessing import Pool
def parallel_execute_db(db, statement_list, no_of_threads=10 ):
stmt_count = len(statement_list)
with Pool(processes=min(stmt_count, no_of_threads)) as pool:
return pool.map(db.sqlscript, statement_list)这段代码首先确定要创建的进程数量,该数量是语句列表的长度和最大线程数的较小值。然后,它创建一个进程池,并将db.sqlscript函数映射到语句列表中的每个元素。pool.map函数将语句列表分割成小块,并将这些小块分配给池中的各个进程。每个进程在其分配的小块上执行db.sqlscript函数,并将结果返回给主进程。with语句确保进程池在使用完毕后正确关闭,释放资源。
立即学习“Python免费学习笔记(深入)”;
示例说明:
- db: 数据库连接对象,需要已经建立连接。
- statement_list: 包含需要执行的SQL语句的列表。
- no_of_threads: 允许的最大并发线程数,默认为10。
- db.sqlscript: 接受一个SQL语句作为参数并执行的函数。
注意事项:
安装IJOB系统序列号:ka163-ka169-51tom-54tom-card163-1186 此版本只有个人管理的80%左右的代码! 对个人求职管理的部分文件的代码进行了删除,以便和正版用户区别,不支持发信测试! 对数据库也进行了部分企业管理表的删除,以免有人续写程序! 正版用户包括整个IJOB文件包,大约3M左右!并付送两套本人制作的商业系统 ([企业产品展示系统]、[企业产品展示+购物系统
- db.sqlscript函数必须是可序列化的,以便在进程之间传递。
- 数据库连接对象db需要在每个进程中独立创建,避免多个进程共享同一个连接导致问题。
进程池的创建与复用
创建进程池通常是一个耗时的操作。如果需要频繁地执行并发数据库操作,可以考虑复用进程池,而不是每次都创建新的进程池。
class DBExecutor:
def __init__(self, db_config, pool_size=10):
self.db_config = db_config
self.pool = Pool(processes=pool_size)
def execute_statements(self, statement_list):
# Create DB connection for each process
def init_worker():
global db
db = psycopg2.connect(**self.db_config)
# Function to execute in each process
def execute_statement(statement):
try:
with db.cursor() as cursor:
cursor.execute(statement)
db.commit()
return True
except Exception as e:
print(f"Error executing statement: {statement}, Error: {e}")
db.rollback()
return False
self.pool.map(execute_statement, statement_list)
def close(self):
self.pool.close()
self.pool.join()
# 使用示例
db_config = {
'host': 'localhost',
'database': 'your_database',
'user': 'your_user',
'password': 'your_password'
}
executor = DBExecutor(db_config, pool_size=5)
statements = ["INSERT INTO table1 (col1) VALUES ('value1')", "UPDATE table2 SET col2 = 'value2' WHERE id = 1"]
executor.execute_statements(statements)
executor.close()在这个例子中,DBExecutor类创建并管理一个进程池。execute_statements方法接受一个SQL语句列表,并将这些语句分配给进程池中的进程执行。close方法用于关闭进程池,释放资源。
数据库连接管理
数据库连接是宝贵的资源,需要谨慎管理。最佳实践是,由"所有者"创建和销毁数据库连接。这意味着,创建连接的代码也应该负责关闭连接,避免资源泄漏。
推荐做法:
- 在每个进程中独立创建数据库连接。
- 使用with语句管理数据库连接,确保连接在使用完毕后自动关闭。
- 避免在内部函数中修改或关闭外部函数的数据库连接。
错误示例:
def outer_function(db):
def inner_function(statement):
# 不推荐:在内部函数中关闭外部函数的数据库连接
db.close()
inner_function("SELECT * FROM table")总结
使用Python的multiprocessing模块可以方便地实现并发数据库操作。通过控制最大并发数,可以避免资源过度消耗。同时,合理的进程池管理和数据库连接管理是保证程序稳定性和性能的关键。希望本文提供的教程能够帮助您更好地使用Python进行并发数据库编程。









