
本文旨在解决使用or-tools `linear_solver`处理大规模分配问题时遇到的性能瓶颈。针对n值超过40-50的工人-任务分配问题,`linear_solver`的求解时间显著增加。通过分析问题特性,我们推荐切换至`cp-sat`求解器。`cp-sat`专为整数规划设计,能显著提升求解速度,并能有效处理浮点系数,通过内部缩放机制将其转换为整数进行优化,从而在保持模型精度的同时,实现更高效的计算。
在复杂的资源分配场景中,例如将工人分配给任务,并要求最小化最高成本与最低成本之间的差异,同时满足多种基于工人ID的约束条件(如特定任务只能由特定ID的工人完成、某些任务组必须由相同ID的工人完成、或某些任务组的工人ID值之和受限),精确建模至关重要。OR-Tools库提供了强大的线性规划和混合整数规划(MIP)求解器,但当问题规模(N值,例如工人/任务数量)增大时,通用MIP求解器(如linear_solver结合SCIP后端)的求解时间可能会迅速变得不切实际。
原始代码使用ortools.linear_solver.pywraplp模块,并选择SCIP作为后端求解器。该模型成功地构建了分配变量x[i, j]、任务ID变量tasks_ids[j],并设置了以下约束:
目标函数设定为最小化所有工人分配成本中的最大值与最小值之间的差异。这通过引入辅助变量max_cost和min_cost来实现。
尽管模型逻辑正确,但当工人/任务数量N超过40时,求解时间开始变得难以接受;N超过50时,甚至在10分钟内都无法得到结果。这表明对于此类规模的问题,当前的求解策略存在效率瓶颈。尝试调整求解器参数(如PRESOLVE_ON)或提供初始解,对于通用MIP求解器而言,可能效果有限,且并非所有参数都直接通过Python API暴露。
鉴于该分配问题本质上是一个纯整数模型(尽管成本和目标函数涉及浮点数,但决策变量x[i, j]是二进制的,任务ID也是整数),ortools.sat.python.cp_model模块中的CP-SAT(Constraint Programming - Satisfiability)求解器是更优的选择。CP-SAT专为组合优化和约束满足问题设计,通常在处理整数变量和复杂逻辑约束方面比通用MIP求解器表现出显著的速度优势。
CP-SAT的一个关键特性是它能有效地处理浮点系数。即使模型中存在浮点数(例如成本),CP-SAT也会在内部尝试对它们进行缩放,将其转换为整数,从而利用其高效的整数算术和布尔推理能力。这使得用户无需手动进行复杂的浮点数到整数的转换,同时保证了求解的效率和精度。
以下是将原问题模型迁移到CP-SAT求解器的示例代码。我们将重点关注模型构建方式的改变以及浮点数处理的策略。
from ortools.sat.python import cp_model
import numpy as np
# number of workers and tasks
N = 40 # 尝试更大的N值,例如70
# cost table for each worker-task pairs
np.random.seed(0)
costs = np.random.rand(N,N)*100
# 为了CP-SAT更好地处理浮点数,我们可以将成本缩放到整数
# 例如,乘以1000,然后进行四舍五入
# CP-SAT内部也会进行类似操作,但明确转换有时能提供更好的控制或性能
SCALING_FACTOR = 1000
scaled_costs = (costs * SCALING_FACTOR).astype(int)
# workers IDs
workers_id = (np.random.rand(N)*4).astype(np.uint32)
id_2_idsrt_dict = {0: 'A', 1: 'B', 2: 'C', 3: 'D'}
workers_id_str = [id_2_idsrt_dict[val] for val in workers_id]
# print(f"Worker IDs: {workers_id_str}") # 打印信息可以帮助调试
idsrt_2_id_dict = {}
for id_val, idstr in id_2_idsrt_dict.items():
idsrt_2_id_dict[idstr] = id_val
# print(f"ID string to ID dict: {idsrt_2_id_dict}")
num_workers = len(costs)
num_tasks = len(costs[0])
# Solver
# Create the CP-SAT model
model = cp_model.CpModel()
# Variables
# x[i, j] is an array of 0-1 variables, which will be 1
# if worker i is assigned to task j.
x = {}
for i in range(num_workers):
for j in range(num_tasks):
x[i, j] = model.NewBoolVar(f"x_{i}_{j}") # 使用NewBoolVar创建布尔变量
# Variables
# tasks_ids[j] is a list of integers variables contains each task's assigned worker id.
# 这里需要NewIntVar,因为workers_id是整数
tasks_ids = []
for j in range(num_tasks):
# task_id_j = sum([workers_id[i]*x[i, j] for i in range(num_workers)])
# CP-SAT不能直接对NewBoolVar求和得到NewIntVar,需要使用model.NewIntVar和model.Add
task_id_var = model.NewIntVar(0, max(workers_id), f"task_id_{j}")
model.Add(task_id_var == sum([workers_id[i]*x[i, j] for i in range(num_workers)]))
tasks_ids.append(task_id_var)
# Constraint
# Each worker is assigned to exactly one task.
for i in range(num_workers):
model.Add(sum(x[i, j] for j in range(num_tasks)) == 1)
# Constraint
# Each task is assigned to exactly one worker.
for j in range(num_tasks):
model.Add(sum(x[i, j] for i in range(num_workers)) == 1)
# Constraint
# Task 1 can be assigned only with workers that have the id "A"
model.Add(tasks_ids[1] == idsrt_2_id_dict["A"])
# Constraint
# Tasks 2,4,6 must assigned with workers of the same id
model.Add(tasks_ids[2] == tasks_ids[4])
model.Add(tasks_ids[2] == tasks_ids[6])
# Constraint
# Tasks 10,11,12 must assigned with workers of the same id
model.Add(tasks_ids[10] == tasks_ids[11])
model.Add(tasks_ids[11] == tasks_ids[12])
# Constraint
# Tasks 1,2,3 sum of ids <= 4
model.Add((tasks_ids[1] + tasks_ids[2] + tasks_ids[3]) <= 4)
# Constraint
# Tasks 4,5,6 sum of ids <= 4
model.Add((tasks_ids[4] + tasks_ids[5] + tasks_ids[6]) <= 4)
# Constraint
# Tasks 7,8,9 sum of ids <= 3
model.Add((tasks_ids[7] + tasks_ids[8] + tasks_ids[9]) <= 3)
# Objective
# minimize the difference of assignment higher cost worker and lower cost worker
# list of workers costs for an assignment
assignment_workers_costs_list = []
for i in range(num_workers):
# 注意这里使用 scaled_costs
worker_cost_var = model.NewIntVar(0, int(np.max(scaled_costs)), f"worker_cost_{i}")
model.Add(worker_cost_var == sum([scaled_costs[i][j] * x[i, j] for j in range(num_tasks)]))
assignment_workers_costs_list.append(worker_cost_var)
# Additional variables for max and min costs
# CP-SAT的Max和Min操作需要一个列表的变量
max_cost_var = model.NewIntVar(0, int(np.max(scaled_costs)), 'max_cost')
min_cost_var = model.NewIntVar(0, int(np.max(scaled_costs)), 'min_cost') # min_cost_limit在CP-SAT中通常设为0或最小可能值
# Constraints to update max and min costs
# 使用model.AddMaxEquality和model.AddMinEquality
model.AddMaxEquality(max_cost_var, assignment_workers_costs_list)
model.AddMinEquality(min_cost_var, assignment_workers_costs_list)
# Minimize the difference between max and min costs
model.Minimize(max_cost_var - min_cost_var)
# Create a solver and solve the model.
solver = cp_model.CpSolver()
# 可以设置一些参数来观察求解过程
# solver.parameters.log_search_progress = True
# solver.parameters.num_workers = 8 # 根据CPU核心数调整
print(f"Solving with CP-SAT {solver.parameters.solver_version}")
status = solver.Solve(model)
# Print solution.
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# 目标值需要除以缩放因子
objective_diff = solver.ObjectiveValue() / SCALING_FACTOR
print(f"Difference (scaled back)= {objective_diff:.2f}\n")
for i in range(num_workers):
for j in range(num_tasks):
if solver.Value(x[i, j]) > 0.5:
# 成本也需要除以缩放因子
original_cost = costs[i][j]
print(f"Worker {i} ({workers_id_str[i]}) assigned to task {j}." + f" Cost: {original_cost:.2f}")
else:
print("No solution found.")在CP-SAT中,所有变量和约束通常都期望是整数。当原始问题包含浮点成本时,有两种主要处理方式:
对于本问题,由于目标是最小化成本差异,而成本本身是浮点数,通过手动缩放,我们可以将所有成本转换为整数,使CP-SAT能更直接地处理整数目标函数。
当使用OR-Tools解决大规模分配问题,特别是当问题本质上是整数规划时,从linear_solver切换到CP-SAT求解器是提升性能的关键策略。CP-SAT凭借其在整数和布尔变量处理上的优势,以及对浮点系数的内部缩放能力,能够显著缩短求解时间,使原本不切实际的问题规模变得可解。在实际应用中,理解CP-SAT的模型构建范式并适当考虑浮点数缩放,将有助于充分发挥其强大性能。
以上就是优化OR-Tools解决大规模分配问题:CP-SAT的性能优势与浮点数缩放的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号