
本教程详细介绍了如何使用simpy库构建一个离散事件模拟模型,以模拟复杂的工厂生产线。文章涵盖了simpy环境设置、资源定义(如操作员、机器人、工装夹具)、零件生产流程的建模,以及资源请求与释放的关键机制。通过一个实际的工厂案例,演示了如何管理并发进程和共享资源,并讨论了模拟中可能出现的死锁问题及资源管理最佳实践。
SimPy是一个强大的Python库,用于进行离散事件模拟。它允许用户通过定义“进程”(processes)和“资源”(resources)来模拟系统中的并发活动。在工厂生产线建模中,零件的加工、机器人的操作、操作员的协作等都可以被视为SimPy中的进程,而机器人、操作员、工装夹具等则是共享资源。
在构建模拟模型时,通常会定义一些全局常量,以便于管理和调整模拟参数。这些常量可以包括模拟时长、资源数量以及各类操作的时间消耗等。
import simpy
import itertools
class g: # 全局常量类
t_sim = 600 # 模拟最大时长,单位:秒 (原为24*60*60,此处为简化调试改为600)
operator_qty = 2 # 操作员数量
sWeld_t = 2.75 # 点焊时间 (秒/次)
Adh_t = 1/60 # 涂胶时间 (秒/毫米)
ArcStud_t = 5.5 # 弧焊螺柱时间 (秒/次)
ProjStud_t = 10 # 凸焊螺柱时间 (秒/次)在这个g类中,我们定义了模拟总时长t_sim、操作员数量operator_qty以及各种加工步骤的单位时间,如点焊时间sWeld_t、涂胶时间Adh_t等。
cell_model类是整个工厂单元的核心,它负责初始化SimPy环境、定义所有共享资源,并启动零件生成进程。
在cell_model的__init__方法中,我们创建SimPy环境并声明所有需要的资源。
class cell_model: # 代表整个工厂单元
def __init__(self): # 单元的初始条件
self.env = simpy.Environment() # 创建SimPy环境
self._dict=dict() # 备用字典,此处未使用
self.part_counter = 0 # 零件计数器,从零开始
# 定义操作员资源,使用PriorityResource以支持优先级调度
# 操作员数量有限,且可在不同操作类型之间切换
self.OP = simpy.PriorityResource(self.env, capacity=g.operator_qty)
# 定义机器人资源,每个机器人一次只能处理一个零件
self.R1 = simpy.PriorityResource(self.env, capacity=1)
self.R2 = simpy.PriorityResource(self.env, capacity=1)
self.R3 = simpy.PriorityResource(self.env, capacity=1)
self.R4 = simpy.PriorityResource(self.env, capacity=1)
# 定义工装夹具资源,每个夹具一次只能放置一个零件
self.Fix01 = simpy.PriorityResource(self.env, capacity=1)
self.Fix02 = simpy.PriorityResource(self.env, capacity=1)
self.Fix101 = simpy.PriorityResource(self.env, capacity=1)
# 启动零件生成器进程
self.env.process(self.part_generator())这里,我们使用simpy.PriorityResource来定义资源。PriorityResource允许请求者在请求资源时指定一个优先级。在本例中,p_Id(零件ID)被用作优先级,确保零件按照它们进入系统的顺序(先进先出,FIFO)进行处理。
make_part方法定义了单个零件在工厂中经历的一系列加工步骤。每个步骤都可能需要特定的资源,并且会消耗一定的时间。
def make_part(self, p_Id): # 描述单个零件通过单元的完整流程
# --- 步骤1: 操作员处理零件并将其送至工装夹具01 ---
OP_req = self.OP.request(priority=p_Id) # 请求操作员
print(f"part {p_Id} 请求操作员 OP 在 {self.env.now} 秒")
yield OP_req # 等待操作员可用
print(f"part {p_Id} 占用操作员 OP 在 {self.env.now} 秒")
print("Part ", p_Id, " 已准备就绪在 ", self.env.now, sep="")
yield self.env.timeout(29) # 操作员搬运、贴胶带、放置到夹具01
# --- 步骤2: 放置到工装夹具01并加载零件 ---
Fix01_req = self.Fix01.request(priority=p_Id) # 请求工装夹具01
print(f"part {p_Id} 请求工装夹具 Fix01 在 {self.env.now} 秒")
yield Fix01_req # 等待夹具01可用
print(f"part {p_Id} 占用工装夹具 Fix01 在 {self.env.now} 秒")
yield self.env.timeout(27) # 操作员加载零件,退出,启动
yield self.OP.release(OP_req) # 释放操作员
print(f"part {p_Id} 释放操作员 OP 在 {self.env.now} 秒")
print("操作员释放 Part ", p_Id, " 在 ", self.env.now, sep="")
yield self.env.timeout(0) # 稍作停顿
# --- 步骤3: 机器人R2在夹具01进行点焊 ---
# 使用with语句管理资源,确保资源在块结束时自动释放
with self.R2.request(priority=p_Id) as R2_req:
print(f"part {p_Id} 请求机器人 R2 (初次) 在 {self.env.now} 秒")
yield R2_req # 等待机器人R2可用
print(f"part {p_Id} 占用机器人 R2 (初次) 在 {self.env.now} 秒")
print("R2 移动到 Fix01 为 Part ", p_Id, " 工作在 ", self.env.now, sep="")
yield self.env.timeout(g.sWeld_t * 11) # R2进行11次点焊
print(f"part {p_Id} 释放机器人 R2 (初次) 在 {self.env.now} 秒")
# --- 步骤4: 机器人R1将零件移出夹具01并进行焊接 ---
R1_req = self.R1.request(priority=p_Id) # 请求机器人R1
print(f"part {p_Id} 请求机器人 R1 在 {self.env.now} 秒")
yield R1_req # 等待机器人R1可用
print(f"part {p_Id} 占用机器人 R1 在 {self.env.now} 秒")
yield self.env.timeout(8) # R1将零件移出Fix01
print("Part ", p_Id, " 离开 Fix01 在 ", self.env.now, sep="")
yield self.Fix01.release(Fix01_req) # 释放工装夹具01
print(f"part {p_Id} 释放工装夹具 Fix01 在 {self.env.now} 秒")
yield self.env.timeout(g.sWeld_t * 13) # 进行13次点焊
yield self.env.timeout(0)
yield self.env.timeout(g.ArcStud_t * 5) # 进行5次弧焊螺柱
yield self.env.timeout(8) # R1移动到工装夹具101放置点
# --- 步骤5: 放置到工装夹具101 ---
Fix101_req = self.Fix101.request(priority=p_Id) # 请求工装夹具101
print(f"part {p_Id} 请求工装夹具 Fix101 在 {self.env.now} 秒")
yield Fix101_req # 等待夹具101可用
print(f"part {p_Id} 占用工装夹具 Fix101 在 {self.env.now} 秒")
print("Part ", p_Id, " 进入 Fix101 在 ", self.env.now, sep="")
yield self.R1.release(R1_req) # 释放机器人R1
print(f"part {p_Id} 释放机器人 R1 在 {self.env.now} 秒")
# --- 步骤6: 机器人R4在夹具101进行弧焊螺柱 ---
with self.R4.request(priority=p_Id) as R4_req:
print(f"part {p_Id} 请求机器人 R4 在 {self.env.now} 秒")
yield R4_req # 等待机器人R4可用
print(f"part {p_Id} 占用机器人 R4 在 {self.env.now} 秒")
yield self.env.timeout(g.ArcStud_t * 6) # R4在夹具101进行6次弧焊螺柱
print(f"part {p_Id} 释放机器人 R4 在 {self.env.now} 秒")
# --- 步骤7: 机器人R3将零件移出夹具101并进行凸焊螺柱和涂胶 ---
R3_req = self.R3.request(priority=p_Id) # 请求机器人R3
print(f"part {p_Id} 请求机器人 R3 在 {self.env.now} 秒")
yield R3_req # 等待机器人R3可用
print(f"part {p_Id} 占用机器人 R3 在 {self.env.now} 秒")
yield self.env.timeout(8) # R3将零件移出Fix101
print("Part ", p_Id, " 离开 Fix101 在 ", self.env.now, sep="")
yield self.Fix101.release(Fix101_req) # 释放工装夹具101
print(f"part {p_Id} 释放工装夹具 Fix101 在 {self.env.now} 秒")
yield self.env.timeout(g.ProjStud_t * 6) # 进行6次凸焊螺柱
yield self.env.timeout(0)
yield self.env.timeout(225 * g.Adh_t) # 涂255mm的胶
print("Part ", p_Id, " 涂胶阶段完成在 ", self.env.now, sep="")
yield self.env.timeout(8) # R3移动到工装夹具02放置点
# --- 步骤8: 放置到工装夹具02 ---
Fix02_req = self.Fix02.request(priority=p_Id) # 请求工装夹具02
print(f"part {p_Id} 请求工装夹具 Fix02 在 {self.env.now} 秒")
yield Fix02_req # 等待夹具02可用
print(f"part {p_Id} 占用工装夹具 Fix02 在 {self.env.now} 秒")
yield self.env.timeout(0)
print("Part ", p_Id, " 进入 Fix02 在 ", self.env.now, sep="")
yield self.R3.release(R3_req) # 释放机器人R3
print(f"part {p_Id} 释放机器人 R3 在 {self.env.now} 秒")
# --- 步骤9: 操作员在夹具02进行操作 ---
with self.OP.request(priority=p_Id) as OP_req:
print(f"part {p_Id} 请求操作员 OP (二次) 在 {self.env.now} 秒")
yield OP_req # 等待操作员可用
print(f"part {p_Id} 占用操作员 OP (二次) 在 {self.env.now} 秒")
yield self.env.timeout(38) # 操作员进行操作2
yield self.env.timeout(0)
print(f"part {p_Id} 释放操作员 OP (二次) 在 {self.env.now} 秒")
# --- 步骤10: 机器人R2在夹具02完成焊接 (机器人R2的二次使用) ---
with self.R2.request(priority=p_Id) as R2_req:
print(f"part {p_Id} 请求机器人 R2 (二次) 在 {self.env.now} 秒")
yield R2_req # 等待机器人R2可用
print(f"part {p_Id} 占用机器人 R2 (二次) 在 {self.env.now} 秒")
print("R2 移动到 Fix02 为 Part ", p_Id, " 工作在 ", self.env.now, sep="")
yield self.env.timeout(g.sWeld_t * 35) # R2进行35次点焊
yield self.env.timeout(0)
print(f"part {p_Id} 释放机器人 R2 (二次) 在 {self.env.now} 秒")
# --- 步骤11: 操作员移除零件,完成流程 ---
with self.OP.request(priority=p_Id) as OP_req:
print(f"part {p_Id} 请求操作员 OP (三次) 在 {self.env.now} 秒")
yield OP_req # 等待操作员可用
print(f"part {p_Id} 占用操作员 OP (三次) 在 {self.env.now} 秒")
yield self.env.timeout(2) # 操作员移除零件并重新开始循环
yield self.Fix02.release(Fix02_req) # 释放工装夹具02
print(f"part {p_Id} 释放工装夹具 Fix02 在 {self.env.now} 秒")
print("Part ", p_Id, " 离开流程在 ", self.env.now, sep="")
yield self.env.timeout(0)
print(f"part {p_Id} 释放操作员 OP (三次) 在 {self.env.now} 秒")资源请求与释放的关键机制:
在SimPy中,正确管理资源的请求(request())和释放(release())是至关重要的。
显式请求/释放 (resource.request() 和 resource.release()): 这种方式提供了最大的灵活性。当一个进程需要长时间占用某个资源,或者在资源被请求后、释放前需要执行多个不连续的操作时,使用这种方式。例如,操作员将零件从A点搬运到B点,然后才释放,期间可能还有其他操作。在这种情况下,request()返回一个请求事件,yield该事件以等待资源被占用,然后在适当的时机显式调用release()。
with 语句 (with resource.request() as req:): 这是Python上下文管理器的一种应用,它提供了一种更简洁、更安全的方式来管理资源。当进程进入with块时,资源会被请求;当进程退出with块时(无论正常退出还是发生异常),资源都会被自动释放。这对于资源只在特定、连续的代码块中被占用的情况非常方便。
在上述代码中,原始问题中的部分with块会导致资源过早释放。例如,如果一个操作员在将零件放到夹具后就通过with块自动释放了,但实际上该操作员可能还需要等待夹具被占用后才能离开。因此,对于需要跨越多个逻辑步骤或在特定条件才释放的资源,应使用显式的request()和release()。修改后的代码混合使用了这两种方式,以适应不同的资源管理需求。
part_generator进程负责在模拟环境中周期性地创建新的零件进程。
def part_generator(self):
for p_Id in itertools.count(): # 无限生成零件ID
yield self.env.timeout(1) # 每隔1秒生成一个新零件
self.env.process(self.make_part(p_Id)) # 启动一个新零件的加工进程这里使用itertools.count()生成无限递增的零件ID,并通过self.env.timeout(1)控制零件的生成间隔。self.env.process()则将make_part方法注册为一个SimPy进程,使其在模拟环境中并发运行。
run方法启动模拟,并指定模拟的持续时间。
def run(self, t_sim=g.t_sim): # 运行方法启动实体生成器并告诉SimPy运行环境
self.env.run(until=t_sim) # 运行模拟直到指定时间# 运行模拟
print("模拟开始...")
my_model = cell_model()
my_model.run()
print("模拟结束。")SimPy为建模复杂的工厂生产线提供了强大的工具。通过将生产过程分解为离散事件和资源交互,我们可以构建出能够反映现实世界动态的模拟模型。正确的资源管理(请求与释放)是确保模型准确性和避免死锁的关键。通过详细的日志记录和逐步调试,可以有效地识别和解决模拟中出现的问题,从而为生产优化提供有价值的洞察。
以上就是使用SimPy进行工厂生产线离散事件模拟建模教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号