
本文旨在解决使用or-tools `linear_solver`处理大规模分配问题时遇到的性能瓶颈。针对n值超过40-50的工人-任务分配问题,`linear_solver`的求解时间显著增加。通过分析问题特性,我们推荐切换至`cp-sat`求解器。`cp-sat`专为整数规划设计,能显著提升求解速度,并能有效处理浮点系数,通过内部缩放机制将其转换为整数进行优化,从而在保持模型精度的同时,实现更高效的计算。
在复杂的资源分配场景中,例如将工人分配给任务,并要求最小化最高成本与最低成本之间的差异,同时满足多种基于工人ID的约束条件(如特定任务只能由特定ID的工人完成、某些任务组必须由相同ID的工人完成、或某些任务组的工人ID值之和受限),精确建模至关重要。OR-Tools库提供了强大的线性规划和混合整数规划(MIP)求解器,但当问题规模(N值,例如工人/任务数量)增大时,通用MIP求解器(如linear_solver结合SCIP后端)的求解时间可能会迅速变得不切实际。
原始问题与性能瓶颈
原始代码使用ortools.linear_solver.pywraplp模块,并选择SCIP作为后端求解器。该模型成功地构建了分配变量x[i, j]、任务ID变量tasks_ids[j],并设置了以下约束:
- 每个工人分配一个任务。
- 每个任务分配一个工人。
- 特定任务的工人ID限制。
- 多组任务必须由相同ID的工人完成。
- 多组任务的工人ID值之和限制。
目标函数设定为最小化所有工人分配成本中的最大值与最小值之间的差异。这通过引入辅助变量max_cost和min_cost来实现。
尽管模型逻辑正确,但当工人/任务数量N超过40时,求解时间开始变得难以接受;N超过50时,甚至在10分钟内都无法得到结果。这表明对于此类规模的问题,当前的求解策略存在效率瓶颈。尝试调整求解器参数(如PRESOLVE_ON)或提供初始解,对于通用MIP求解器而言,可能效果有限,且并非所有参数都直接通过Python API暴露。
推荐解决方案:CP-SAT求解器
鉴于该分配问题本质上是一个纯整数模型(尽管成本和目标函数涉及浮点数,但决策变量x[i, j]是二进制的,任务ID也是整数),ortools.sat.python.cp_model模块中的CP-SAT(Constraint Programming - Satisfiability)求解器是更优的选择。CP-SAT专为组合优化和约束满足问题设计,通常在处理整数变量和复杂逻辑约束方面比通用MIP求解器表现出显著的速度优势。
CP-SAT的一个关键特性是它能有效地处理浮点系数。即使模型中存在浮点数(例如成本),CP-SAT也会在内部尝试对它们进行缩放,将其转换为整数,从而利用其高效的整数算术和布尔推理能力。这使得用户无需手动进行复杂的浮点数到整数的转换,同时保证了求解的效率和精度。
使用CP-SAT重构模型
以下是将原问题模型迁移到CP-SAT求解器的示例代码。我们将重点关注模型构建方式的改变以及浮点数处理的策略。
from ortools.sat.python import cp_model
import numpy as np
# number of workers and tasks
N = 40 # 尝试更大的N值,例如70
# cost table for each worker-task pairs
np.random.seed(0)
costs = np.random.rand(N,N)*100
# 为了CP-SAT更好地处理浮点数,我们可以将成本缩放到整数
# 例如,乘以1000,然后进行四舍五入
# CP-SAT内部也会进行类似操作,但明确转换有时能提供更好的控制或性能
SCALING_FACTOR = 1000
scaled_costs = (costs * SCALING_FACTOR).astype(int)
# workers IDs
workers_id = (np.random.rand(N)*4).astype(np.uint32)
id_2_idsrt_dict = {0: 'A', 1: 'B', 2: 'C', 3: 'D'}
workers_id_str = [id_2_idsrt_dict[val] for val in workers_id]
# print(f"Worker IDs: {workers_id_str}") # 打印信息可以帮助调试
idsrt_2_id_dict = {}
for id_val, idstr in id_2_idsrt_dict.items():
idsrt_2_id_dict[idstr] = id_val
# print(f"ID string to ID dict: {idsrt_2_id_dict}")
num_workers = len(costs)
num_tasks = len(costs[0])
# Solver
# Create the CP-SAT model
model = cp_model.CpModel()
# Variables
# x[i, j] is an array of 0-1 variables, which will be 1
# if worker i is assigned to task j.
x = {}
for i in range(num_workers):
for j in range(num_tasks):
x[i, j] = model.NewBoolVar(f"x_{i}_{j}") # 使用NewBoolVar创建布尔变量
# Variables
# tasks_ids[j] is a list of integers variables contains each task's assigned worker id.
# 这里需要NewIntVar,因为workers_id是整数
tasks_ids = []
for j in range(num_tasks):
# task_id_j = sum([workers_id[i]*x[i, j] for i in range(num_workers)])
# CP-SAT不能直接对NewBoolVar求和得到NewIntVar,需要使用model.NewIntVar和model.Add
task_id_var = model.NewIntVar(0, max(workers_id), f"task_id_{j}")
model.Add(task_id_var == sum([workers_id[i]*x[i, j] for i in range(num_workers)]))
tasks_ids.append(task_id_var)
# Constraint
# Each worker is assigned to exactly one task.
for i in range(num_workers):
model.Add(sum(x[i, j] for j in range(num_tasks)) == 1)
# Constraint
# Each task is assigned to exactly one worker.
for j in range(num_tasks):
model.Add(sum(x[i, j] for i in range(num_workers)) == 1)
# Constraint
# Task 1 can be assigned only with workers that have the id "A"
model.Add(tasks_ids[1] == idsrt_2_id_dict["A"])
# Constraint
# Tasks 2,4,6 must assigned with workers of the same id
model.Add(tasks_ids[2] == tasks_ids[4])
model.Add(tasks_ids[2] == tasks_ids[6])
# Constraint
# Tasks 10,11,12 must assigned with workers of the same id
model.Add(tasks_ids[10] == tasks_ids[11])
model.Add(tasks_ids[11] == tasks_ids[12])
# Constraint
# Tasks 1,2,3 sum of ids <= 4
model.Add((tasks_ids[1] + tasks_ids[2] + tasks_ids[3]) <= 4)
# Constraint
# Tasks 4,5,6 sum of ids <= 4
model.Add((tasks_ids[4] + tasks_ids[5] + tasks_ids[6]) <= 4)
# Constraint
# Tasks 7,8,9 sum of ids <= 3
model.Add((tasks_ids[7] + tasks_ids[8] + tasks_ids[9]) <= 3)
# Objective
# minimize the difference of assignment higher cost worker and lower cost worker
# list of workers costs for an assignment
assignment_workers_costs_list = []
for i in range(num_workers):
# 注意这里使用 scaled_costs
worker_cost_var = model.NewIntVar(0, int(np.max(scaled_costs)), f"worker_cost_{i}")
model.Add(worker_cost_var == sum([scaled_costs[i][j] * x[i, j] for j in range(num_tasks)]))
assignment_workers_costs_list.append(worker_cost_var)
# Additional variables for max and min costs
# CP-SAT的Max和Min操作需要一个列表的变量
max_cost_var = model.NewIntVar(0, int(np.max(scaled_costs)), 'max_cost')
min_cost_var = model.NewIntVar(0, int(np.max(scaled_costs)), 'min_cost') # min_cost_limit在CP-SAT中通常设为0或最小可能值
# Constraints to update max and min costs
# 使用model.AddMaxEquality和model.AddMinEquality
model.AddMaxEquality(max_cost_var, assignment_workers_costs_list)
model.AddMinEquality(min_cost_var, assignment_workers_costs_list)
# Minimize the difference between max and min costs
model.Minimize(max_cost_var - min_cost_var)
# Create a solver and solve the model.
solver = cp_model.CpSolver()
# 可以设置一些参数来观察求解过程
# solver.parameters.log_search_progress = True
# solver.parameters.num_workers = 8 # 根据CPU核心数调整
print(f"Solving with CP-SAT {solver.parameters.solver_version}")
status = solver.Solve(model)
# Print solution.
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
# 目标值需要除以缩放因子
objective_diff = solver.ObjectiveValue() / SCALING_FACTOR
print(f"Difference (scaled back)= {objective_diff:.2f}\n")
for i in range(num_workers):
for j in range(num_tasks):
if solver.Value(x[i, j]) > 0.5:
# 成本也需要除以缩放因子
original_cost = costs[i][j]
print(f"Worker {i} ({workers_id_str[i]}) assigned to task {j}." + f" Cost: {original_cost:.2f}")
else:
print("No solution found.")CP-SAT模型构建要点:
- 导入模块: 使用 from ortools.sat.python import cp_model。
- 创建模型: model = cp_model.CpModel()。
-
变量声明:
- 布尔变量(0或1)使用 model.NewBoolVar(name)。
- 整数变量使用 model.NewIntVar(lower_bound, upper_bound, name)。
- 注意,tasks_ids 和 assignment_workers_costs_list 中的变量必须是model.NewIntVar创建的变量,而不是直接的Python整数或浮点数。
-
添加约束: 所有约束都通过 model.Add(...) 方法添加。
- 等式约束:model.Add(expr == value)。
- 不等式约束:model.Add(expr = value)。
- 对于最大/最小值的辅助变量,CP-SAT提供了 model.AddMaxEquality(target_var, list_of_vars) 和 model.AddMinEquality(target_var, list_of_vars),这比手动添加循环约束更简洁高效。
- 目标函数: 使用 model.Minimize(expression) 或 model.Maximize(expression)。表达式必须是CP-SAT变量或其线性组合。
- 求解: 创建 solver = cp_model.CpSolver() 实例,然后调用 solver.Solve(model)。
- 获取结果: 使用 solver.Value(variable) 获取变量的解。
浮点数处理与缩放
在CP-SAT中,所有变量和约束通常都期望是整数。当原始问题包含浮点成本时,有两种主要处理方式:
- CP-SAT内部自动缩放: CP-SAT在内部会尝试检测浮点系数并对其进行缩放以转换为整数。这通常是默认且推荐的方式,因为它能自动处理精度问题。在日志中,可以看到CP-SAT关于缩放的报告。
- 手动缩放: 如示例代码所示,可以手动将所有浮点成本乘以一个足够大的整数(SCALING_FACTOR),然后四舍五入转换为整数。这样做可以确保所有输入都是整数,并且可能在某些情况下提供更可控的精度或性能。在获取最终结果时,需要将目标值和相关成本除以相同的缩放因子还原。
对于本问题,由于目标是最小化成本差异,而成本本身是浮点数,通过手动缩放,我们可以将所有成本转换为整数,使CP-SAT能更直接地处理整数目标函数。
性能调优与注意事项
- CP-SAT参数: CP-SAT拥有丰富的参数集,可以通过 solver.parameters 进行访问和调整。例如,solver.parameters.log_search_progress = True 可以打印详细的求解日志,帮助理解求解器的行为。solver.parameters.num_workers 可以设置并行求解的线程数,充分利用多核CPU。
- 问题规模: 即使CP-SAT效率更高,对于极其庞大的问题(N远超70甚至上百),求解时间仍然可能很长。此时,可能需要考虑问题分解、启发式算法或近似求解策略。
- 约束紧密性: 在CP-SAT中,更紧密、更强的约束通常能帮助求解器更快地剪枝搜索空间。尽量利用CP-SAT提供的各种约束类型(如AddAllDifferent、AddCircuit等)来精确表达问题逻辑。
- 初始解: CP-SAT通常不直接接受外部提供的初始解来“热启动”求解过程。然而,对于某些特定问题,可以通过自定义搜索策略或利用启发式方法生成一个好的初始解,然后将其作为模型的一个约束(例如,将某些变量固定为初始解的值),从而引导求解器。但对于一般分配问题,直接切换到CP-SAT通常已是最大的性能提升。
总结
当使用OR-Tools解决大规模分配问题,特别是当问题本质上是整数规划时,从linear_solver切换到CP-SAT求解器是提升性能的关键策略。CP-SAT凭借其在整数和布尔变量处理上的优势,以及对浮点系数的内部缩放能力,能够显著缩短求解时间,使原本不切实际的问题规模变得可解。在实际应用中,理解CP-SAT的模型构建范式并适当考虑浮点数缩放,将有助于充分发挥其强大性能。











