DPG是确定策略梯度 (Deterministic Policy Gradient, DPG)是最常用的连续控制方法。DPG是一种 Actor-Critic 方法,它有一个策略网络(演员),一个价值网络(评委)。策略网络控制智能体做运动,它基于状态 s 做出动作 a。价值网络不控制智能体,只是基于状态 s 给动作 a 打分,从而指导策略网络做出改进。
☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

DPG是确定策略梯度 (Deterministic Policy Gradient, DPG)是最常用的连续控制方法。DPG是一种 Actor-Critic 方法,它有一个策略网络(演员),一个价值网络(评委)。策略网络控制智能体做运动,它基于状态 s 做出动作 a。价值网络不控制智能体,只是基于状态 s 给动作 a 打分,从而指导策略网络做出改进。 
        
倒立摆问题的经典的连续控制问题,钟摆以随机位置开始,目标是将其向上摆动,使其保持直立。其状态空间为3,动作空间为1(因为是连续的,有范围)。具体可以参考下图:
 
        
 
        
 
        
使用深度神经网络DPG训练模型,也就是DDPG方法,使其在倒立摆环境中能够获得较好的奖励。
import gymimport paddlefrom itertools import countimport numpy as npfrom collections import dequeimport randomimport timefrom visualdl import LogWriter
DDPG这种方法与Q学习紧密相关,可以看做是 【连续】动作空间的深度Q学习 
        
class Critic(paddle.nn.Layer):
    def __init__(self):
        super(Critic,self).__init__()       
        # 状态空间为3
        self.fc1=paddle.nn.Linear(3,256) 
      
        # 连接了动作空间,所以+1
        self.fc2=paddle.nn.Linear(256+1,128)        # 
        self.fc3=paddle.nn.Linear(128,1)
        self.relu=paddle.nn.ReLU()    
    def forward(self,x,a):
        x=self.relu(self.fc1(x))
        x=paddle.concat((x,a),axis=1)
        x=self.relu(self.fc2(x))
        x=self.fc3(x)        return x为了使DDPG策略更好的进行探索,在训练时对其行为增加了干扰。原始DDPG论文建议使用时间相关的OU噪声,但最近的结果表明,【不相关的均值零高斯噪声】效果更好。后者更简单,首选。 
        
class Actor(paddle.nn.Layer):
    def __init__(self,is_tranin=True):
        super(Actor,self).__init__()
        
        self.fc1=paddle.nn.Linear(3,256)
        self.fc2=paddle.nn.Linear(256,128)
        self.fc3=paddle.nn.Linear(128,1)
        self.relu=paddle.nn.ReLU() # max(0,x)
        self.tanh=paddle.nn.Tanh() # 双切正切曲线 (e^(x)-e^(-x))/(e^(x)+e^(-x))
        self.noisy=paddle.distribution.Normal(0,0.2)
        self.is_tranin=is_tranin    
    def forward(self,x):
        x=self.relu(self.fc1(x))
        x=self.relu(self.fc2(x))
        x=self.tanh(self.fc3(x))        return x    
    def select_action(self,epsilon,state):
        state=paddle.to_tensor(state,dtype='float32').unsqueeze(0)        # 创建一个上下文来禁用动态图梯度计算。在此模式下,每次计算的结果都将具有stop_gradient=True。
        with paddle.no_grad():            # squeeze():会删除输入Tensor的Shape中尺寸为1的维度
            # noisy抽样结果是1维的tensor
            action=self.forward(state).squeeze()+self.is_tranin*epsilon*self.noisy.sample([1]).squeeze(0)        
        # paddle.clip:将输入的所有元素进行剪裁,使得输出元素限制在[min, max]内
        # 动作空间是[-2,2]
        return 2*paddle.clip(action,-1,1).numpy()这是智能体以前的经验,为了使算法具有稳定的行为,重播缓冲区应该足够大以包含广泛的经验。
 
        
class Memory(object):
   
    def __init__(self,memory_size):
        self.memory_size=memory_size
        self.buffer=deque(maxlen=self.memory_size)    
    # 增加经验,因为经验数组是存放在deque中的,deque是双端队列,
    # 我们的deque指定了大小,当deque满了之后再add元素,则会自动把队首的元素出队
    def add(self,experience):
        self.buffer.append(experience)    
    def size(self):
        return len(self.buffer)    # continuous=True则表示连续取batch_size个经验
    def sample(self , batch_szie , continuous = True):
        # 选取的经验数目是否超过缓冲区内经验的数目
        if batch_szie>len(self.buffer):
            batch_szie=len(self.buffer)        
        # 是否连续取经验
        if continuous:            # random.randint(a,b) 返回[a,b]之间的任意整数
            rand=random.randint(0,len(self.buffer)-batch_szie)            return [self.buffer[i] for i in range(rand,rand+batch_szie)]        else:            # numpy.random.choice(a, size=None, replace=True, p=None)
            # a 如果是数组则在数组中采样;a如果是整数,则从[0,a-1]这个序列中随机采样
            # size 如果是整数则表示采样的数量
            # replace为True可以重复采样;为false不会重复
            # p 是一个数组,表示a中每个元素采样的概率;为None则表示等概率采样
            indexes=np.random.choice(np.arange(len(self.buffer)),size=batch_szie,replace=False)            return [self.buffer[i] for i in indexes]    
    def clear(self):
        self.buffer.clear()env=gym.make('Pendulum-v0')
actor=Actor()
critic=Critic()W1225 22:38:55.283380 289 device_context.cc:447] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.1, Runtime API Version: 10.1 W1225 22:38:55.287163 289 device_context.cc:465] device: 0, cuDNN Version: 7.6.
critic_optim=paddle.optimizer.Adam(parameters=critic.parameters(),learning_rate=3e-5) actor_optim=paddle.optimizer.Adam(parameters=actor.parameters(),learning_rate=1e-5)
explore=50000epsilon=1 # 控制动作的抽样gamma=0.99tau=0.001memory_replay=Memory(50000)
begin_train=Falsebatch_szie=32learn_steps=0epochs=100writer=LogWriter('./logs') 
        
s_time=time.time()for epoch in range(0,epochs):
    start_time=time.time()
    state=env.reset()
    episode_reward=0 # 记录一个epoch的奖励综合
    # 一个epoch进行2000的时间步
    for time_step in range(2000):        # 步骤1,策略网络根据初始状态做预测
        action=actor.select_action(epsilon,state)
        next_state,reward,done,_=env.step([action])
        episode_reward+=reward        # 这一步在做什么? reward [-1,1]
        reward=(reward+8.2)/8.2
        memory_replay.add((state,next_state,action,reward))        # 当经验回放数组中的存放一定量的经验才开始利用
        if memory_replay.size()>1280:
            learn_steps+=1
            if not begin_train:                print('train begin!')
                begin_train=True
            
            # false : 不连续抽样
            experiences=memory_replay.sample(batch_szie,False)
            batch_state,batch_next_state,batch_action,batch_reward=zip(*experiences)
            batch_state=paddle.to_tensor(batch_state,dtype='float32')
            batch_next_state=paddle.to_tensor(batch_next_state,dtype='float32')
            batch_action=paddle.to_tensor(batch_action,dtype='float32').unsqueeze(1)
            batch_reward=paddle.to_tensor(batch_reward,dtype='float32').unsqueeze(1)            
            with paddle.no_grad():                
                # 策略网络做预测
                at,at1=actor(batch_state),actor(batch_next_state)                # 价值网络做预测
                qt,qt1=critic(batch_state,batch_action),critic(batch_next_state,at1)                # 计算TD目标
                Q_traget=batch_reward+gamma*qt1            
            # TD误差函数
            # 该OP用于计算预测值和目标值的均方差误差。(预测值,目标值)
            # reduction='mean',默认值,计算均值
            critic_loss=paddle.nn.functional.mse_loss(qt,Q_traget)            # 更新价值网络
            critic_optim.clear_grad()
            critic_loss.backward()
            critic_optim.step()
            writer.add_scalar('critic loss',critic_loss.numpy(),learn_steps)            # 更新策略网络
            critic.eval()
            actor_loss=-critic(batch_state,at)
            actor_loss=actor_loss.mean()
            actor_optim.clear_grad()
            actor_loss.backward()
            actor_optim.step()
            critic.train()
            writer.add_scalar('actor loss',actor_loss.numpy(),learn_steps)        # epsilon 逐渐减小
        if epsilon>0:
            epsilon-=1/explore
        state=next_state
     
    writer.add_scalar('episode reward',episode_reward/2000,epoch)    
    if epoch%10==0:        print("Epoch:{}, episode reward is {:.3f}, use time is {:.3f}s ".format(epoch,episode_reward/2000,time.time()-start_time))    
print("************ all time is {:.3f} h:".format((time.time()-s_time)/3600))train begin!
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/paddle/tensor/creation.py:130: DeprecationWarning: `np.object` is a deprecated alias for the builtin `object`. To silence this warning, use `object` by itself. Doing this will not modify any behavior and is safe. Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations if data.dtype == np.object:
Epoch:0, episode reward is -6.058, use time is 4.891s Epoch:10, episode reward is -6.146, use time is 11.346s Epoch:20, episode reward is -6.144, use time is 11.830s Epoch:30, episode reward is -7.270, use time is 11.998s Epoch:40, episode reward is -6.148, use time is 11.932s Epoch:50, episode reward is -7.335, use time is 12.149s Epoch:60, episode reward is -6.411, use time is 12.301s Epoch:70, episode reward is -6.135, use time is 12.365s Epoch:80, episode reward is -6.768, use time is 12.136s Epoch:90, episode reward is -6.219, use time is 12.191s ************ all time is 0.329 h:
这里我们重新定义了所有参数,即使前面定义过,因为有的参数是不断更新的(值变了),所有我们在这里统一重新定义一下。
actor=Actor()
critic=Critic()
critic_optim=paddle.optimizer.Adam(parameters=critic.parameters(),learning_rate=3e-5)
actor_optim=paddle.optimizer.Adam(parameters=actor.parameters(),learning_rate=1e-5)# 目标网络,缓解自举造成的高估问题actor_target=Actor()
critic_target=Critic()
explore=50000epsilon=1 # 控制动作的抽样gamma=0.99tau=0.001memory_replay=Memory(50000)
begin_train=Falsebatch_szie=32learn_steps=0epochs=100twriter=LogWriter('./newlogs')更新目标网络参数,公式如下:
 
        
def soft_update(target,source,tau):
    # zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表。
    for target_param,param in zip(target.parameters(),source.parameters()):
        target_param.set_value(target_param*(1.0-tau)+param*tau) 
        
s_time=time.time()
k=0for epoch in range(0,epochs):
    start_time=time.time()
    state=env.reset()
    episode_reward=0 # 记录一个epoch的奖励综合
    # 一个epoch进行2000的时间步
    for time_step in range(2000):        # 步骤1,策略网络根据初始状态做预测
        action=actor.select_action(epsilon,state)
        next_state,reward,done,_=env.step([action])
        episode_reward+=reward        # reward缩放到 [-1,1]
        reward=(reward+8.2)/8.2
        memory_replay.add((state,next_state,action,reward))        # 当经验回放数组中的存放一定量的经验才开始利用
        if memory_replay.size()>1280:
            learn_steps+=1
            if not begin_train:                print('train begin!')
                begin_train=True
            
            # false : 不连续抽样
            # 从经验数组中随机抽取四元组(s_t,a_t,r_t,s_t+1)
            experiences=memory_replay.sample(batch_szie,False)
            batch_state,batch_next_state,batch_action,batch_reward=zip(*experiences)
            batch_state=paddle.to_tensor(batch_state,dtype='float32')
            batch_next_state=paddle.to_tensor(batch_next_state,dtype='float32')
            batch_action=paddle.to_tensor(batch_action,dtype='float32').unsqueeze(1)
            batch_reward=paddle.to_tensor(batch_reward,dtype='float32').unsqueeze(1)            # 目标策略网络做预测,输入是 s_t+1 预测出下一个动作a_t+1
            # 目标价值网络做预测,输入是s_t+1和a_t+1,输出是q_tt
            # 计算TD目标,y= r_t+gamma*q_tt
            with paddle.no_grad():                # Q_next traget是TD目标
                Q_next=critic_target(batch_next_state,actor_target(batch_next_state))
                Q_traget=batch_reward+gamma*Q_next            
            # 价值网络做预测,输入是s_t,a_t,输出是q_t
            # 计算TD误差,error=q_t-y
            # 误差函数
            # 该OP用于计算预测值和目标值的均方差误差。(预测值,目标值)
            # reduction='mean',默认值,计算均值
            critic_loss=paddle.nn.functional.mse_loss(critic(batch_state,batch_action),Q_traget)            
            # 更新价值网络
            critic_optim.clear_grad()
            critic_loss.backward()
            critic_optim.step()
            twriter.add_scalar('critic loss',critic_loss.numpy(),learn_steps)            if k==5:                #更新策略网络
                # 使用Critic网络给定值的平均值来评价Actor网络采取的行动。最大化该值。
                # 更新Actor()网络,对于一个给定状态,它产生的动作尽量让Critic网络给出高的评分
                critic.eval()
                actor_loss=-critic(batch_state,actor(batch_state))
                actor_loss=actor_loss.mean()
                actor_optim.clear_grad()
                actor_loss.backward()
                actor_optim.step()
                critic.train()
                twriter.add_scalar('actor loss',actor_loss.numpy(),learn_steps)                # 更新目标网络
                soft_update(actor_target,actor,tau)
                soft_update(critic_target,critic,tau)
                k=0
            else:
                k+=1
        
        # epsilon 逐渐减小
        if epsilon>0:
            epsilon-=1/explore
        state=next_state
     
    twriter.add_scalar('episode reward',episode_reward/2000,epoch)    
    if epoch%10==0:        print("Epoch:{}, episode reward is {:.3f}, use time is {:.3f}s ".format(epoch,episode_reward/2000,time.time()-start_time))    
print("************ all time is {:.3f} h:".format((time.time()-s_time)/3600))train begin! Epoch:0, episode reward is -8.726, use time is 4.682s Epoch:10, episode reward is -7.607, use time is 10.381s Epoch:20, episode reward is -7.419, use time is 11.115s Epoch:30, episode reward is -6.317, use time is 11.464s Epoch:40, episode reward is -8.340, use time is 11.308s Epoch:50, episode reward is -8.306, use time is 11.535s Epoch:60, episode reward is -8.276, use time is 11.356s Epoch:70, episode reward is -1.401, use time is 11.539s Epoch:80, episode reward is -8.145, use time is 11.346s Epoch:90, episode reward is -0.076, use time is 11.321s ************ all time is 0.308 h:
 
        
奖励在训练的后期可以达到较好的奖励值,但是波动较大。但是与没有加入目标网络的模型相比较结果有所进步。后续有时间继续优化。
以上就是【强化学习】DDPG : 强化学习之倒立摆的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号