从零实现PPO算法控制倒立摆:PyTorch实战与调参全指南
在强化学习领域,倒立摆(CartPole)常被称作"强化学习的Hello World"。这个看似简单的环境却包含了状态观测、动作选择、奖励机制等核心概念。本文将带您用PyTorch实现PPO(Proximal Policy Optimization)算法,从环境搭建到模型部署,完整掌握解决控制类问题的技术路线。
1. 环境配置与问题分析
1.1 Gym环境安装与验证
OpenAI Gym提供了标准化的强化学习环境接口,安装只需一行命令:
pip install gym==0.21.0验证安装是否成功:
import gym env = gym.make('CartPole-v1') # 比v0版本更常用 print(env.observation_space) # Box(4,) print(env.action_space) # Discrete(2)倒立摆问题的状态空间包含4个连续变量:
- 小车位置(x)
- 小车速度(v)
- 杆子角度(θ)
- 杆子角速度(ω)
动作空间是离散的:
- 0:向左施加力
- 1:向右施加力
1.2 常见环境配置问题解决
当遇到libiomp5md.dll冲突时,添加以下代码在文件开头:
import os os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"若出现Box2D相关错误,可能需要额外安装:
pip install gym[box2d]2. PPO算法核心实现
2.1 网络架构设计
PPO需要两个神经网络:Actor(策略网络)和Critic(价值网络)。以下是PyTorch实现:
import torch import torch.nn as nn import torch.nn.functional as F class Actor(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim=64): super().__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, action_dim) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return F.softmax(self.fc3(x), dim=-1) class Critic(nn.Module): def __init__(self, state_dim, hidden_dim=64): super().__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) self.fc2 = nn.Linear(hidden_dim, hidden_dim) self.fc3 = nn.Linear(hidden_dim, 1) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) return self.fc3(x)2.2 经验回放缓冲区
PPO需要存储轨迹数据用于多次更新:
import numpy as np class PPOBuffer: def __init__(self, buffer_size, state_dim): self.states = np.zeros((buffer_size, state_dim)) self.actions = np.zeros(buffer_size, dtype=np.int32) self.rewards = np.zeros(buffer_size) self.values = np.zeros(buffer_size) self.log_probs = np.zeros(buffer_size) self.dones = np.zeros(buffer_size) self.ptr = 0 self.max_size = buffer_size def store(self, state, action, reward, value, log_prob, done): idx = self.ptr % self.max_size self.states[idx] = state self.actions[idx] = action self.rewards[idx] = reward self.values[idx] = value self.log_probs[idx] = log_prob self.dones[idx] = done self.ptr += 1 def get(self): return ( self.states[:self.ptr], self.actions[:self.ptr], self.rewards[:self.ptr], self.values[:self.ptr], self.log_probs[:self.ptr], self.dones[:self.ptr] )3. 训练流程与超参数调优
3.1 核心训练循环
完整的训练流程包含以下关键步骤:
def train(env, agent, buffer, episodes=500, max_steps=200, gamma=0.99, gae_lambda=0.95, clip_ratio=0.2, train_iters=80, batch_size=64): rewards_history = [] for ep in range(episodes): state = env.reset() ep_reward = 0 for step in range(max_steps): # 与环境交互 action, value, log_prob = agent.get_action(state) next_state, reward, done, _ = env.step(action) # 存储经验 buffer.store(state, action, reward, value, log_prob, done) state = next_state ep_reward += reward if done: break # 计算GAE优势估计 states, actions, rewards, values, log_probs, dones = buffer.get() advantages = compute_gae(rewards, values, dones, gamma, gae_lambda) # 更新策略 agent.update(states, actions, log_probs, advantages, train_iters, batch_size, clip_ratio) rewards_history.append(ep_reward) print(f"Episode {ep+1}: Reward {ep_reward:.1f}") return rewards_history3.2 关键超参数解析
| 参数 | 典型值 | 作用 | 调整建议 |
|---|---|---|---|
| gamma | 0.99 | 折扣因子 | 越高表示越重视远期奖励 |
| gae_lambda | 0.95 | GAE平衡参数 | 影响优势估计的偏差-方差权衡 |
| clip_ratio | 0.2 | 策略更新限制 | 防止策略更新过大 |
| learning_rate | 3e-4 | 学习率 | 太大导致不稳定,太小收敛慢 |
| batch_size | 64 | 批量大小 | 影响梯度估计的稳定性 |
| hidden_dim | 64 | 网络隐藏层维度 | 太小欠拟合,太大过拟合 |
4. 模型部署与性能优化
4.1 模型保存与加载
训练完成后保存模型权重:
def save_model(agent, path): torch.save({ 'actor_state_dict': agent.actor.state_dict(), 'critic_state_dict': agent.critic.state_dict(), }, path) def load_model(agent, path): checkpoint = torch.load(path) agent.actor.load_state_dict(checkpoint['actor_state_dict']) agent.critic.load_state_dict(checkpoint['critic_state_dict'])4.2 实时可视化测试
使用Matplotlib创建实时渲染:
import matplotlib.pyplot as plt from IPython import display def test_agent(env, agent, episodes=5): plt.figure(figsize=(10, 6)) for ep in range(episodes): state = env.reset() img = plt.imshow(env.render(mode='rgb_array')) for step in range(200): action, _, _ = agent.get_action(state) state, _, done, _ = env.step(action) img.set_data(env.render(mode='rgb_array')) plt.axis('off') display.display(plt.gcf()) display.clear_output(wait=True) if done: break4.3 性能优化技巧
向量化环境:使用
gym.vector并行多个环境from gym.vector import SyncVectorEnv envs = SyncVectorEnv([lambda: gym.make('CartPole-v1') for _ in range(4)])自动混合精度训练:加速计算
from torch.cuda.amp import GradScaler, autocast scaler = GradScaler() with autocast(): loss = compute_loss(...) scaler.scale(loss).backward() scaler.step(optimizer) scaler.update()奖励塑形:修改原始奖励函数
def shaped_reward(state, reward, done): x, v, theta, omega = state # 添加角度惩罚项 return reward - 0.1 * abs(theta) - 0.01 * abs(omega)
在实际项目中,我发现当批量大小设置为64-128、GAE参数λ=0.95时,模型收敛最稳定。对于简单的CartPole环境,通常100-200个训练回合就能达到完美性能。