前言

在利用强化学习进行自动驾驶开发时,虽然目前已经有了CARLA、CARSIM、TORCS等一系列开发环境,但针对本硕等一些电脑配置不高的学生党来说,一个可编辑性高、上手难度不大、不吃配置的开发环境,用来进行算法验证是非常必要的。

环境的官方连接如下:

https://highway-env.readthedocs.io/en/latest/

优点

1、对电脑配置要求不高;

2、具有一定的车辆动力学模型;

3、可以按自己的需求绘制道路形状、改变车辆及道路的相关参数。

贡献

虽然官方文档给出了如何调用baseline库里的成熟模型的方法,但初学人员比较倾向于自己搭建调试网络。基于这一需求,结合目前主流的深度学习框架pytorch,利用DDPG算法给出了对车辆进行横、纵向控制的代码,相关代码仅搭建出可以跑通的结构,并未对网络参数进行详细调整,具体调参可根据自己的需要进行,这里仅给出跑通的框架。

代码

import math
import random
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
import pprint
import highway_envuse_cuda = torch.cuda.is_available()
print(use_cuda)
device   = torch.device("cuda" if use_cuda else "cpu")class ValueNetwork(nn.Module):def __init__(self, num_inputs, num_actions, hidden_size ,init_w = 3e-3):super(ValueNetwork, self).__init__()self.linear1 = nn.Linear(num_inputs + num_actions, hidden_size)self.linear2 = nn.Linear(hidden_size, hidden_size)self.linear3 = nn.Linear(hidden_size, 1)self.linear3.weight.data.uniform_(-init_w,init_w)self.linear3.bias.data.uniform_(-init_w,init_w)def forward(self, state, action):x = torch.cat([state, action], 1)x = F.relu(self.linear1(x))x = F.relu(self.linear2(x))x = self.linear3(x)return xclass PolicyNetwork(nn.Module):def __init__(self, num_inputs, num_actions, hidden_size, init_w = 3e-3):super(PolicyNetwork, self).__init__()self.linear1 = nn.Linear(num_inputs, hidden_size)self.linear2 = nn.Linear(hidden_size, hidden_size)self.linear3 = nn.Linear(hidden_size, num_actions)# uniform_将tensor用从均匀分布中抽样得到的值填充。参数初始化self.linear3.weight.data.uniform_(-init_w, init_w)#也用用normal_(0, 0.1) 来初始化的,高斯分布中抽样填充,这两种都是比较有效的初始化方式self.linear3.bias.data.uniform_(-init_w, init_w)#其意义在于我们尽可能保持 每个神经元的输入和输出的方差一致。#使用 RELU(without BN) 激活函数时,最好选用 He 初始化方法,将参数初始化为服从高斯分布或者均匀分布的较小随机数#使用 BN 时,减少了网络对参数初始值尺度的依赖,此时使用较小的标准差(eg:0.01)进行初始化即可#但是注意DRL中不建议使用BNdef forward(self, x):x = F.relu(self.linear1(x))x = F.relu(self.linear2(x))x = F.tanh(self.linear3(x))return xdef get_action(self, state):state = torch.FloatTensor(state).unsqueeze(0).to(device)action = self.forward(state)return action.detach().cpu().numpy()[0]class OUNoise(object):def __init__(self, action_space, mu=0.0, theta = 0.15, max_sigma = 0.3, min_sigma = 0.3, decay_period = 10000):#decay_period要根据迭代次数合理设置self.mu = muself.theta = thetaself.sigma = max_sigmaself.max_sigma = max_sigmaself.min_sigma = min_sigmaself.decay_period = decay_periodself.action_dim = action_space.shape[0]self.low = action_space.lowself.high = action_space.highself.reset()def reset(self):self.state = np.ones(self.action_dim) *self.mudef evolve_state(self):x = self.statedx = self.theta* (self.mu - x) + self.sigma * np.random.randn(self.action_dim)self.state = x + dxreturn self.statedef get_action(self, action, t=0):ou_state = self.evolve_state()self.sigma = self.max_sigma - (self.max_sigma - self.min_sigma) * min(1.0, t / self.decay_period)return np.clip(action + ou_state, self.low, self.high)class ReplayBuffer:def __init__(self, capacity):self.capacity = capacityself.buffer = []self.position = 0def push(self, state, action, reward, next_state, done):if len(self.buffer) < self.capacity:self.buffer.append(None)self.buffer[self.position] = (state, action, reward, next_state, done)self.position = (self.position + 1) % self.capacitydef sample(self, batch_size):batch = random.sample(self.buffer, batch_size)state, action, reward, next_state, done = map(np.stack, zip(*batch))return state, action, reward, next_state, donedef __len__(self):return len(self.buffer)class NormalizedActions(gym.ActionWrapper):def action(self, action):low_bound = self.action_space.lowupper_bound = self.action_space.highaction = low_bound + (action + 1.0) * 0.5 * (upper_bound - low_bound)#将经过tanh输出的值重新映射回环境的真实值内action = np.clip(action, low_bound, upper_bound)return actiondef reverse_action(self, action):low_bound = self.action_space.lowupper_bound = self.action_space.high#因为激活函数使用的是tanh,这里将环境输出的动作正则化到(-1,1)action = 2 * (action - low_bound) / (upper_bound - low_bound) - 1action = np.clip(action, low_bound, upper_bound)return actionclass DDPG(object):def __init__(self, action_dim, state_dim, hidden_dim):super(DDPG,self).__init__()self.action_dim, self.state_dim, self.hidden_dim = action_dim, state_dim, hidden_dimself.batch_size = 28self.gamma = 0.99self.min_value = -np.infself.max_value = np.infself.soft_tau = 2e-2self.replay_buffer_size = 8000self.value_lr = 5e-3self.policy_lr = 5e-4self.value_net = ValueNetwork(state_dim, action_dim, hidden_dim).to(device)self.policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)self.target_value_net = ValueNetwork(state_dim, action_dim, hidden_dim).to(device)self.target_policy_net = PolicyNetwork(state_dim, action_dim, hidden_dim).to(device)for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):target_param.data.copy_(param.data)for target_param, param in zip(self.target_policy_net.parameters(), self.policy_net.parameters()):target_param.data.copy_(param.data)self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=self.value_lr)self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=self.policy_lr)self.value_criterion = nn.MSELoss()self.replay_buffer = ReplayBuffer(self.replay_buffer_size)def ddpg_update(self):state, action, reward, next_state, done = self.replay_buffer.sample(self.batch_size)state = torch.FloatTensor(state).to(device)next_state = torch.FloatTensor(next_state).to(device)action = torch.FloatTensor(action).to(device)reward = torch.FloatTensor(reward).unsqueeze(1).to(device)done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(device)policy_loss = self.value_net(state, self.policy_net(state))policy_loss = -policy_loss.mean()next_action = self.target_policy_net(next_state)target_value = self.target_value_net(next_state, next_action.detach())expected_value = reward + (1.0 - done) * self.gamma * target_valueexpected_value = torch.clamp(expected_value, self.min_value, self.max_value)value = self.value_net(state, action)value_loss = self.value_criterion(value, expected_value.detach())self.policy_optimizer.zero_grad()policy_loss.backward()self.policy_optimizer.step()self.value_optimizer.zero_grad()value_loss.backward()self.value_optimizer.step()for target_param, param in zip(self.target_value_net.parameters(), self.value_net.parameters()):target_param.data.copy_(target_param.data * (1.0 - self.soft_tau) + param.data * self.soft_tau)for target_param, param in zip(self.target_policy_net.parameters(), self.policy_net.parameters()):target_param.data.copy_(target_param.data * (1.0 - self.soft_tau) + param.data * self.soft_tau)env = gym.make("fei-v0")#自定义的环境,与自带的racetrack环境相同,目前在学习如何自定义自己的环境
env.configure(
{"observation": {"type": "OccupancyGrid","features": ['presence','on_road', "vx", "vy"],# "features_range": {# "x": [-100, 100],# "y": [-100, 100],# "vx": [-20, 20],# "vy": [-20, 20]},"grid_size": [[-6, 6], [-9, 9]],"grid_step": [3, 3],#每个网格的大小"as_image": False,"align_to_vehicle_axes": True},"action": {"type": "ContinuousAction","longitudinal": True,"lateral": True},"simulation_frequency": 15,"policy_frequency": 5,"duration": 500,"collision_reward": -10,"lane_centering_cost": 6,"action_reward": -0.3,"controlled_vehicles": 1,"other_vehicles": 5,"screen_width": 600,"screen_height": 600,"centering_position": [0.5, 0.5],"scaling": 7,"show_trajectories": False,"render_agent": True,"offscreen_rendering": False
})env.reset()
env = NormalizedActions(env)ou_noise = OUNoise(env.action_space)state_dim = env.observation_space.shape[2]*env.observation_space.shape[1]*env.observation_space.shape[0]
action_dim = env.action_space.shape[0]
print("状态维度"+str(state_dim))
print("动作维度"+str(action_dim))
# print(env.action_space)
hidden_dim = 256ddpg = DDPG(action_dim, state_dim, hidden_dim)max_steps = 250
rewards = []
batch_size = 32
VAR = 1  # control explorationfor step in range(max_steps):print("================第{}回合======================================".format(step+1))state = env.reset()state = torch.flatten(torch.tensor(state))ou_noise.reset()episode_reward = 0done = Falsest=0while not done:action = ddpg.policy_net.get_action(state)# print(action)action[0] = np.clip(np.random.normal(action[0],VAR),-1,1) # 在动作选择上添加随机噪声action[1] = np.clip(np.random.normal(action[1],VAR),-1,1) # 在动作选择上添加随机噪声# action = ou_noise.get_action(action, st)next_state, reward, done, _ = env.step(action)#奖励函数的更改需要自行打开安装的库在本地的位置进行修改next_state = torch.flatten(torch.tensor(next_state))if reward == 0.0:#车辆出界,回合结束reward = -10done = Trueddpg.replay_buffer.push(state, action, reward, next_state, done)if len(ddpg.replay_buffer) > batch_size:VAR *= .9995    # decay the action randomnessddpg.ddpg_update()state = next_stateepisode_reward += rewardenv.render()st=st+1rewards.append(episode_reward)print("回合奖励为:{}".format(episode_reward))
env.close()plt.plot(rewards)
plt.savefig('C:/Users/Administrator/Desktop/episode_reward.jpg')

后记

肯定会有我没意识到的错误,欢迎大家批评指正,大家共同进步!

基于highway-env的DDPG-pytorch自动驾驶实现相关推荐

  1. 基于RGB和LiDAR融合的自动驾驶3D语义分割

    基于RGB和LiDAR融合的自动驾驶3D语义分割 论文 RGB and LiDAR fusion based 3D Semantic Segmentationfor Autonomous Drivin ...

  2. 基于车路协同的高等级自动驾驶数据交互内容

    基于车路协同的高等级自动驾驶数据交互内容 基于车路协同的高等级自动驾驶系统组成 1. 系统架构 2. 系统功能 2.1 RSS各组成单元功能 2.2 VSS各组成单元功能 3. 系统交互 基于车路协同 ...

  3. 基于OpenCV和深度学习的自动驾驶遥控小车

    阅读原文 项目中使用的技术 Python + OpenCV Neural Network + Haar-Cascade Classifiers 项目目标 改装后遥控小车完成三个任务:自动驾驶,识别信号 ...

  4. 基于模型参考自适应控制(MRAC)的自动驾驶方向盘(油门)控制方法

    实习还是能学到很多学校学不到的东西,总结下实习期间学到的一个自适应控制方法.最近比较忙,先大致写下原理的笔记供自己复习,后面有空再更下仿真.如有错误请不吝赐教~ 背景 举个例子,目前公司自动驾驶车队主 ...

  5. 基于骁龙820A系统的自动驾驶发展趋势

    目前,随着无线技术的加速创新和发展,全球汽车行业正经历着前所未有的技术创新加速.数据显示,全球已有超过2000万辆汽车搭载了骁龙调制解调器. 高通提供了神经网络处理引擎(SNPE).这款深度学习开发套 ...

  6. 基于神经网络预测车道行驶的自动驾驶

    数据定义先假定 首先定义我们在自动驾驶,并且可以获取前后左右各个车的数据.传感器和摄像头已经在车身上部署好.  假定我们在车道上行驶,根据跟驰模型数据我们能知道的是: 1 左车距离 2 右车距离 3 ...

  7. 环宇智行基于NVIDIA TX2的L4级自动驾驶方案

    参考: http://www.in-driving.com/product/showproduct.php?lang=cn&id=54#ad-image-0 http://zhidx.com/ ...

  8. 基于激光雷达+惯导+轮速计的自动驾驶融合定位方案

    高精度定位是当前自动驾驶的基础.随着自动驾驶的日益发展,依靠单一传感器进行定位常常会受到各种限制,比如GPS信号不稳定或者消失.长直走廊或隧道等场景下激光点云发生漂移.轮胎打滑等造成瞬时轮速计异常等. ...

  9. 观点PK | 自动驾驶传感器“一哥之争”,这事儿你怎么看?

    自动驾驶现已成为人工智能技术应用落地的热门领域,但随着无人车迈出的步伐越大,面临的安全性方面的挑战也越大.而近期不断发生的自动驾驶车辆事故也将自动驾驶安全性的问题再次推向风口浪尖.自动驾驶车辆的安全性 ...

  10. 自动驾驶,别再谈「接管」色变了

    编者按:过去两年里,国内 Robotaxi 的公开运营多点开花,这些 Robotaxi 所搭载的自动驾驶系统也都具备了处理常见场景的能力,比如识别红绿灯.避让行人.变道超车等--在这种背景下,零接管似 ...

最新文章

  1. CCNA-(9)-思科交换机特点
  2. 【错误记录】Git 使用报错 ( no changes added to commit (use “git add“ and/or “git commit -a“) )
  3. MacOS系统下的图形化工具
  4. oracle 压测工具 ld,Oracle压力测试工具使用说明
  5. make的一些默认操作
  6. [开源] .Net ORM FreeSql 1.8.0-preview 最新动态播报
  7. go 微服务框架_清晰架构(Clean Architecture)的Go微服务
  8. 大数——大数相加(hdu1002)
  9. 华硕a501lb5200加内存和固盘并装上win7系统并设置固盘为第一启动
  10. 创业型 APP 如何筛选合适的推送平台
  11. 74-SSM项目实战前端开发
  12. 学习矩阵分析与应用过程中的点滴记录(一)
  13. RAM格式与JPG对比
  14. 量子计算机王,王正汉|量子计算机:下一轮工业革命的引擎
  15. printer: PJL
  16. Codechef April Challenge 2019 游记
  17. 使用nodejs-koa2-mysql-sequelize-jwt 实现项目api接口
  18. OSPF网络可以没有BDR(实验)
  19. 关于IOS由于Dropbox被封,https请求不好用的解决办法
  20. 4月计算机网络原理试题,4月全国自考计算机网络原理试题及答案解析.docx

热门文章

  1. Codeforces Round #481 (Div. 3) F. Mentors(排序,暴力,map记忆化)
  2. 自学java,学多久可以自己找到工作?
  3. 五分钟创建一个ChatGPT Plugin
  4. [Win32] API Hook(2)在64位系统上的实现
  5. 技术整理-开发过程中遇到问题-留存
  6. 银行系统(atm机)实现登陆注册存款取款转账
  7. (初学入门足够)Linux终端命令学习——在VMware虚拟机中创建的ubuntu(Linux最常用的发行版本)
  8. U盘文件只读的解决办法
  9. MES制造业生产管理系统源码 MES系统源码
  10. CTEX使用bib生成文献的问题解决。