这段内容基于Paddle框架实现了Advantage Actor-Critic(A2C)算法,以CartPole-v0环境为例,涵盖环境配置、状态与动作空间获取,定义Actor和Critic网络,计算TD目标,还展示了含详细输出的训练过程,包括状态处理、动作采样、奖励收集及参数更新等步骤。
☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

Advantage Actor-Critci是带基线的策略梯度方法,其中Advantage代表的是优势函数。可以把Advantage Actor-Critci看做Actor-Critci的改进版本。二者区别如下:
import gymimport osimport sysfrom itertools import countimport paddlefrom paddle.distribution import Categorical
print('python version',sys.version)print('paddle version',paddle.__version__)python version 3.7.4 (default, Aug 13 2019, 20:35:49) [GCC 7.3.0] paddle version 2.2.1
device=paddle.get_device()print(device)
cpu
env=gym.make('CartPole-v0')print(env)<TimeLimit<CartPoleEnv<CartPole-v0>>>
关于 observation_space 与 action_space 代表的到底是什么内容,可以参考CSDN博文:https://wuxian.blog.csdn.net/article/details/89576003
目前我的理解:
state_=env.observation_spaceprint(state_) state_size=state_.shape[0]print(state_size) action_=env.action_spaceprint(action_) action_size=action_.nprint(action_size)
Box(4,) 4 Discrete(2) 2
lr=0.001
截图来自 王树森 老师的书籍《深度强化学习》,Github直达链接:https://github.com/wangshusen/DRL
**如果s是张量,那么使用卷积网络捕获特征向量;如果s是向量,那么使用全连接网络。**这句话摘抄于上述书籍中,个人认为其所述张量应为:大于1维的张量;注意区分标量,向量,矩阵,张量,个人理解如下:
在(初中)高中数学中我们也学过向量,比如计算投影,这时向量是有方向的,但是在深度学习中,我理解的方向与此是有点区别的,即不用考虑方向。
class Actor(paddle.nn.Layer):
def __init__(self,state_size,action_size):
super(Actor,self).__init__()
self.state_size=state_size
self.action_size=action_size
self.l1=paddle.nn.Linear(self.state_size,128)
self.l2=paddle.nn.Linear(128,256)
self.l3=paddle.nn.Linear(256,self.action_size)
self.relu=paddle.nn.ReLU()
def forward(self,state):
out=self.relu(self.l1(state))
out=self.relu(self.l2(out))
out=self.l3(out) # 根据动作的概率(加和为1),生成一个类别分布.paddle框架目前提供了三种分类函数【类别分类,正态分布,均匀分布】
distribution=Categorical(paddle.nn.functional.softmax(out,axis=-1)) return distribution
class Critic(paddle.nn.Layer):
def __init__(self,state_size,action_size):
super(Critic,self).__init__()
self.state_size=state_size
self.action_size=action_size
self.l1=paddle.nn.Linear(self.state_size,128)
self.l2=paddle.nn.Linear(128,256)
self.l3=paddle.nn.Linear(256,1)
self.relu=paddle.nn.ReLU() def forward(self,state):
# paddle.nn.ReLU() 与paddle.nn.functional.relu()的区别是:前者是面向对象,是class,在类的fordward中调用了后者;后者是面向过程,是def。
#out=paddle.nn.functional.relu(self.l1(state)) # relu(x)=max(0,x)
#out=paddle.nn.functional.relu(self.l2(out))
out=self.relu(self.l1(state))
out=self.relu(self.l2(out))
value=self.l3(out) return valueactor_path='model/actor.pdparams'critic_path='model/critic.pdparams'
看可以看王树森的书,但是感觉不太一样,理论与实践存在一些差别,但是总体思想是一致的。
def compute_returns(next_value,rewards,masks,gamma=0.99):
# 相当于n+1时刻的价值
R=next_value
returns=[] # masks=[1,1,1,1,1,...,1,1,1,0]
# rewards=[r0,r1,r2,r3,r4,...,rn-3,rn-2,rn-1,rn]
# 倒序
for step in reversed(range(len(rewards))): # TD目标的计算??
R=rewards[step]+gamma*R*masks[step]
returns.insert(0,R) # returns=[t0,t1,t2,t3,t4,...,tn-3,tn-2,tn-1,tn]
return returns带有很多输出,方便了解每一行代码在做什么事情。 3.3与3.4内容是一样的,只不过3.3用来理解每一步的输出;3.4用来训练,保证输出的简洁。
def trainIters1(actor,critic,n_iters):
# 定义两个网络的优化器
optA=paddle.optimizer.Adam(lr,parameters=actor.parameters())
optC=paddle.optimizer.Adam(lr,parameters=critic.parameters()) for iter in range(n_iters):
state=env.reset() # 环境初始化 state形如[ 0.04700963 -0.0149178 0.01601383 0.03912796]
print(state)
log_probs=[] # 对数概率密度函数
rewards=[] # 奖励列表
values=[] # 价值,critic网络的输出
masks=[] # done or not done
entropy=[] # 交叉熵
env.reset() # 这里为什么又要重置环境??
# python的迭代器 count(初值=0,步长=1)
for i in count(): # env.render()
print('*************************** /n/n/n') print('i = ',i) # state转为paddle.tensor,不指定place,根据环境自行判断
state=paddle.to_tensor(state,dtype='float32') print(state)
dist,value=actor(state),critic(state) print('actor distribution:',dist) # 这是一个分类分布(class)
print('critic value:',value) # 从分布中进行采样(具体如何采样尚且不知,根据概率进行抽样??)
action=dist.sample([1]) print('action is:',action,type(action)) # 环境执行一个时间步长,得到下一个状态,奖励,done,以及info(用不到,这里返回是空的字典)
# env.step()的输入格式是什么?: <class 'numpy.ndarray'>
# 将tensor拷贝到CPU上,不懂意图是什么,如果decive是在GPU上,则会有作用。
print('action.cpu()=',action.cpu()) # 删除axis=0上尺度为1的维度
print('action.cpu().squeeze(0)=',action.cpu().squeeze()) # 获取数值
print('action.cpu().squeeze(0).numpy()=',action.cpu().squeeze().numpy(),type(action.cpu().squeeze().numpy()))
next_state,reward,done,info=env.step(action.cpu().squeeze(0).numpy()) print('next_state=',next_state) print('reward=',reward) print('done=',done) print('info=',info,type(info)) # 计算当前动作的对数概率密度函数
log_prob=dist.log_prob(action) print('log_prob = ',log_prob) # 加入到对数概率密度函数列表中
log_probs.append(log_prob) # 将价值(critic)网络的输出加入到价值列表中
values.append(value) # 将当前奖励加入到奖励列表中
rewards.append(paddle.to_tensor([reward],dtype='float32')) # 将当前的done标志加入到masks列表中,False=0,
masks.append(paddle.to_tensor([1-done],dtype='float32')) print('1-done = ',1-done)
state=next_state if done: if iter%10==0: # score 就是小车坚持了多少个时间步
print("Iteration:{},score:{}".format(iter,i)) break
break #如果想要查看这个for循环每一步的输入,打开这个break
# end for count() 小车运行一次,也就是进行一句游戏
print()
next_state=paddle.to_tensor(next_state,dtype='float32') # 让critic网络根据next_state预测next_value
next_value=critic(next_state) print('next_value = ',next_value)
returns=compute_returns(next_value,rewards,masks) print('returns == ',returns) print('len returns = ',len(returns))
log_probs=paddle.concat(log_probs) # detach() 返回一个新的Tensor,从当前计算图分离。
# 作用是什么???
returns=paddle.concat(returns).detach()
values=paddle.concat(values) print('concat log_probs: ',log_probs) print('concat returns: ',returns) print('concat values: ',values) # 负的 TD 误差 ,是让values接近returns,TD误差是 values-returns
advantage=returns-values # actor的loss计算
# critic的loss计算
actor_loss=-(log_probs*advantage.detach()).mean()
critic_loss=advantage.pow(2).mean() print('actor_loss: ',actor_loss) print('critic_loss: ',critic_loss)
# 更新参数,完成一句游戏更新一次
# 梯度反向传播,注意actor网络是做梯度上升;critic网络是做梯度下降
optA.clear_grad()
optC.clear_grad()
actor_loss.backward()
critic_loss.backward()
optA.step()
optC.step() break # 看一个iter的输出,注意打开这个break
paddle.save(actor.state_dict(),actor_path)
paddle.save(critic.state_dict(),critic_path)
env.close print("111111111111111-------overover****************************************")将很多输出注释掉,输出界面的简洁。3.3与3.4内容是一样的,只不过3.3用来理解每一步的输出;3.4用来训练,保证输出的简洁。
def trainIters2(actor,critic,n_iters):
# 定义两个网络的优化器
optA=paddle.optimizer.Adam(lr,parameters=actor.parameters())
optC=paddle.optimizer.Adam(lr,parameters=critic.parameters()) for iter in range(n_iters):
state=env.reset() # 环境初始化 state形如[ 0.04700963 -0.0149178 0.01601383 0.03912796]
#print(state)
log_probs=[] # 对数概率密度函数
rewards=[] # 奖励列表
values=[] # 价值,critic网络的输出
masks=[] # done or not done
entropy=[] # 交叉熵
env.reset() # 这里为什么又要重置环境??
# python的迭代器 count(初值=0,步长=1)
for i in count(): # env.render() # 在线运行好像不支持
#print('*************************** /n/n/n')
#print('i = ',i)
# state转为paddle.tensor,不指定place,根据环境自行判断
state=paddle.to_tensor(state,dtype='float32') #print(state)
dist,value=actor(state),critic(state) #print('actor distribution:',dist) # 这是一个分类分布(class)
#print('critic value:',value)
# 从分布中进行采样(具体如何采样尚且不知,根据概率进行抽样??)
action=dist.sample([1]) #print('action is:',action,type(action))
# 环境执行一个时间步长,得到下一个状态,奖励,done,以及info(用不到,这里返回是空的字典)
# env.step()的输入格式是什么?: <class 'numpy.ndarray'>
# 将tensor拷贝到CPU上,不懂意图是什么,如果decive是在GPU上,则会有作用。
#print('action.cpu()=',action.cpu())
# 删除axis=0上尺度为1的维度
#print('action.cpu().squeeze(0)=',action.cpu().squeeze())
# 获取数值
#print('action.cpu().squeeze(0).numpy()=',action.cpu().squeeze().numpy(),type(action.cpu().squeeze().numpy()))
next_state,reward,done,info=env.step(action.cpu().squeeze(0).numpy()) # print('next_state=',next_state)
# print('reward=',reward)
# print('done=',done)
# print('info=',info,type(info))
# 计算当前动作的对数概率密度函数
log_prob=dist.log_prob(action) #print('log_prob = ',log_prob)
# 加入到对数概率密度函数列表中
log_probs.append(log_prob) # 将价值(critic)网络的输出加入到价值列表中
values.append(value) # 将当前奖励加入到奖励列表中
rewards.append(paddle.to_tensor([reward],dtype='float32')) # 将当前的done标志加入到masks列表中,False=0,
masks.append(paddle.to_tensor([1-done],dtype='float32')) #print('1-done = ',1-done)
state=next_state if done: if iter%10==0: # score 就是小车坚持了多少个时间步
print("Iteration:{},score:{}".format(iter,i)) break
# break #如果想要查看这个for循环每一步的输入,打开这个注释
# end for count() 小车运行一次,也就是进行一句游戏
#print()
next_state=paddle.to_tensor(next_state,dtype='float32') # 让critic网络根据next_state预测next_value
next_value=critic(next_state) #print('next_value = ',next_value)
returns=compute_returns(next_value,rewards,masks) #print('returns == ',returns)
#print('len returns = ',len(returns))
log_probs=paddle.concat(log_probs) # detach() 返回一个新的Tensor,从当前计算图分离。
# 作用是什么???
returns=paddle.concat(returns).detach()
values=paddle.concat(values) # print('concat log_probs: ',log_probs)
# print('concat returns: ',returns)
# print('concat values: ',values)
# 负的 TD 误差 ,是让values接近returns,TD误差是 values-returns
advantage=returns-values # actor的loss计算
# critic的loss计算
actor_loss=-(log_probs*advantage.detach()).mean()
critic_loss=advantage.pow(2).mean()
# 更新参数,完成一句游戏更新一次
# 梯度反向传播,注意actor网络是做梯度上升;critic网络是做梯度下降
optA.clear_grad()
optC.clear_grad()
actor_loss.backward()
critic_loss.backward()
optA.step()
optC.step() # break
paddle.save(actor.state_dict(),actor_path)
paddle.save(critic.state_dict(),critic_path)
env.close print("222222222222222-------overover****************************************")def main():
actor=Actor(state_size,action_size)
critic=Critic(state_size,action_size) # if os.path.exists(actor_path):
# amodel_state_dict=paddle.load(actor_path)
# actor.set_state_dict(amodel_state_dict)
# print("load actor model from file")
# if os.path.exists(critic_path):
# cmodel_state_dict=paddle.load(critic_path)
# critic.set_state_dict(cmodel_state_dict)
# print("load critic model from file")
# 在这里决定是用3.3 还是 用3.4
trainIters1(actor,critic,n_iters=200) #trainIters2(actor,critic,n_iters=200)main()[-0.01245204 -0.0177881 -0.0447371 0.01200596]
*************************** /n/n/n
i = 0
Tensor(shape=[4], dtype=float32, place=CPUPlace, stop_gradient=True,
[-0.01245204, -0.01778810, -0.04473710, 0.01200596])
actor distribution: <paddle.distribution.Categorical object at 0x7f01d1996a90>
critic value: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[0.00308495])
action is: Tensor(shape=[1], dtype=int64, place=CPUPlace, stop_gradient=False,
[0]) <class 'paddle.Tensor'>
action.cpu()= Tensor(shape=[1], dtype=int64, place=CPUPlace, stop_gradient=False,
[0])
action.cpu().squeeze(0)= Tensor(shape=[], dtype=int64, place=CPUPlace, stop_gradient=False,
0)
action.cpu().squeeze(0).numpy()= 0 <class 'numpy.ndarray'>
next_state= [-0.0036292 -0.2412104 -0.02740825 0.25513875]
reward= 1.0
done= False
info= {} <class 'dict'>
log_prob = Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[-0.69534212])
1-done = 1
next_value = Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[-0.02449672])
returns == [Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[0.97574824])]
len returns = 1
concat log_probs: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[-0.69534212])
concat returns: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=True,
[0.97574824])
concat values: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[0.00308495])
actor_loss: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[0.67633373])
critic_loss: Tensor(shape=[1], dtype=float32, place=CPUPlace, stop_gradient=False,
[0.94607389])
111111111111111-------overover****************************************以上就是【强化】Advantage Actor-Critic (A2C):强化学习之摆车的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号