写在前面

在学习自动驾驶领域上的强化学习过程中,我决定使用highwy-env库建设的模拟器来进行环境构建,但是翻阅了众多教程(包含国内国外)之后,发现教程内容过旧,因为随着2023年的到来,highway-env库也进行了更新,前两年的教程无一例外都使用了老旧版本的函数和返回值。

highway-env是什么东西?

安装方式:(默认最新版)pip install highway-env

首先先列出我发现的新库中的改动:

以前返回值有四个:

observation, reward, done, info = env.step(action)

现在返回值有五个:

observation, reward, terminated, truncated, info = env.step(action)

我推测以前的环境数据形式是ndarray数组:

data = env.reset()

data = (arragry([[ndarray],[],[],...,[]]),type==dtype32)

现在的环境数据形式是元组:

data = env.reset()

data = ((arragry([[ndarray],[],[],...,[]]),type==dtype32,{reward:{},terminated:{},...,})

基于以上改动,那么在代码中的数据处理部分也会相应地发生改变。特别是在使用多个库的时候,需要注意版本关联问题。

参考的一些代码

我的虚拟环境配置:(GPU)

虚拟环境是什么东西?来人,喂它吃九转大肠。

其中必须用到的主要有以下几个:

基于 python 3.8.0

pytorch

gym

highway

tqdm

matplotlib

pygame

numpy

highway-env

使用DoubleDQN算法进行训练,此后还有在此基础上的其他改动。

默认创建python文件double_dqn.py,以下为文件中代码,拼在一起就是完整的。

注释是英文是因为我做的是英文的项目,简单翻译即可。

所使用的库

import os
import copy
import random
import time
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdmimport torch
import torch.nn as nn
import torch.nn.functional as Fimport gym
import highway_env

检测设备并初始化默认十字路口环境

# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Author: Da Xuanzi 2023-2-17# Define the environment
env = gym.make("intersection-v0")
# details
env.config["duration"] = 13
env.config["vehicles_count"] = 20
env.config["vehicles_density"] = 1.3
env.config["reward_speed_range"] = [7.0, 10.0]
env.config["initial_vehicle_count"] = 10
env.config["simulation_frequency"] = 15
env.config["arrived_reward"] = 2
env.reset()

十字路口环境的结构:

env.config
{"observation": {"type": "Kinematics","vehicles_count": 15,"features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],"features_range": {"x": [-100, 100],"y": [-100, 100],"vx": [-20, 20],"vy": [-20, 20],},"absolute": True,"flatten": False,"observe_intentions": False},"action": {"type": "DiscreteMetaAction","longitudinal": False,"lateral": True},"duration": 13,  # [s]"destination": "o1","initial_vehicle_count": 10,"spawn_probability": 0.6,"screen_width": 600,"screen_height": 600,"centering_position": [0.5, 0.6],"scaling": 5.5 * 1.3,"collision_reward": IntersectionEnv.COLLISION_REWARD,"normalize_reward": False
}

构建网络

可以自定义隐藏层节点个数

class Net(nn.Module):def __init__(self, state_dim, action_dim):# super classsuper(Net, self).__init__()# hidden nodes definehidden_nodes1 = 1024hidden_nodes2 = 512self.fc1 = nn.Linear(state_dim, hidden_nodes1)self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)self.fc3 = nn.Linear(hidden_nodes2, action_dim)def forward(self, state):# define forward pass of the actorx = state # state# Relu function doublex = F.relu(self.fc1(x))x = F.relu(self.fc2(x))out = self.fc3(x)return out

构建学习器

class Replay: # learningdef __init__(self,buffer_size, init_length, state_dim, action_dim, env):self.buffer_size = buffer_sizeself.init_length = init_lengthself.state_dim = state_dimself.action_dim = action_dimself.env = envself._storage = []self._init_buffer(init_length)def _init_buffer(self, n):# choose n samples state taken from random actionsstate = self.env.reset()for i in range(n):action = self.env.action_space.sample()observation, reward, done, truncated, info = self.env.step(action)# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit# observation: numpy array [location]# reward: reward for *action# terminated: bool whether end# truncated: bool whether overflow (from done)# info: help/log/informationif type(state) == type((1,)):state = state[0]# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item# because after run env.reset(), the state stores the environmental data and it can not be edited# we only need the state data -- the first ndarrayexp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self._storage.append(exp)state = observationif done:state = self.env.reset()done = Falsedef buffer_add(self, exp):# exp buffer: {exp}=={#                 "state": state,#                 "action": action,#                 "reward": reward,#                 "state_next": observation,#                 "done": terminated,}self._storage.append(exp)if len(self._storage) > self.buffer_size:self._storage.pop(0)  # remove the last one in dictdef buffer_sample(self, n):# random n samples from exp bufferreturn random.sample(self._storage, n)

构建学习对象

PATH = 你的文件夹绝对路径/相对路径

class DOUBLEDQN(nn.Module):def __init__(self,env, # gym environmentstate_dim, # state sizeaction_dim, # action sizelr = 0.001, # learning rategamma = 0.99, # discount factorbatch_size = 5, # batch size for each trainingtimestamp = "",):# super classsuper(DOUBLEDQN, self).__init__()self.env = envself.env.reset()self.timestamp = timestamp# for evaluation purposeself.test_env = copy.deepcopy(env)self.state_dim = state_dimself.action_dim = action_dimself.gamma = gammaself.batch_size = batch_sizeself.learn_step_counter = 0self.is_rend = Falseself.target_net = Net(self.state_dim, self.action_dim).to(device)#TODOself.estimate_net = Net(self.state_dim, self.action_dim).to(device)#TODOself.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)#TODOself.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)def choose_our_action(self, state, epsilon = 0.9):# greedy strategy for choosing action# state: ndarray environment state# epsilon: float in [0,1]# return: action we chosen# turn to 1D float tensor -> [[a1,a2,a3,...,an]]# we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model# ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]if type(state) == type((1,)):state = state[0]temp = [exp for exp in state]target = []target = np.array(target)# n dimension to 1 dimension ndarrayfor i in temp:target = np.append(target,i)state = torch.FloatTensor(target).to(device)# randn() return a set of samples which are Gaussian distribution# no argments -> return a float numberif np.random.randn() <= epsilon:# when random number smaller than epsilon: do these things# put a state array into estimate net to obtain their value array# choose max values in value array -> obtain actionaction_value = self.estimate_net(state)action = torch.argmax(action_value).item()else:# when random number bigger than epsilon: randomly choose a actionaction = np.random.randint(0, self.action_dim)return actiondef train(self, num_episode):# num_eposide: total turn number for trainloss_list = [] # loss setavg_reward_list = [] # reward setepisode_reward = 0rend = 0# tqdm : a model for showing process barfor episode in tqdm(range(1,int(num_episode)+1)):done = Falsestate = self.env.reset()each_loss = 0step = 0if type(state) == type((1,)):state = state[0]while not done:if self.is_rend:self.env.render()step +=1action = self.choose_our_action(state)observation, reward, done, truncated, info = self.env.step(action)exp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self.ReplayBuffer.buffer_add(exp)state = observation# sample random batch in replay memoryexp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)# extract batch dataaction_batch = torch.LongTensor([exp["action"] for exp in exp_batch]).to(device)reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch]).to(device)done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch]).to(device)# Slow method -> Fast method when having more datastate_next_temp = [exp["state_next"] for exp in exp_batch]state_temp = [exp["state"] for exp in exp_batch]state_temp_list = np.array(state_temp)state_next_temp_list = np.array(state_next_temp)state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)state_batch = torch.FloatTensor(state_temp_list).to(device)# reshapestate_batch = state_batch.reshape(self.batch_size, -1)action_batch = action_batch.reshape(self.batch_size, -1)reward_batch = reward_batch.reshape(self.batch_size, -1)state_next_batch = state_next_batch.reshape(self.batch_size, -1)done_batch = done_batch.reshape(self.batch_size, -1)# obtain estimate Q value gather(dim, index) dim==1:column indexestimate_Q_value = self.estimate_net(state_batch).gather(1, action_batch)# obtain target Q value detach:remove the matched elementmax_action_index = self.estimate_net(state_next_batch).detach().argmax(1)target_Q_value = reward_batch + done_batch * self.gamma * self.target_net(state_next_batch).gather(1, max_action_index.unsqueeze(1))# squeeze(1) n*1->1*1, unsqueeze(1) 1*1->n*1# mse_loss: mean lossloss = F.mse_loss(estimate_Q_value, target_Q_value)each_loss += loss.item()# update networkself.optimizer.zero_grad()loss.backward()self.optimizer.step()# update target network# load parameters into modelif self.learn_step_counter % 10 == 0:self.target_net.load_state_dict(self.estimate_net.state_dict())self.learn_step_counter +=1reward, count = self.eval()episode_reward += reward# you can update these variablesif episode_reward % 100 == 0:rend += 1if rend % 5 == 0:self.is_rend = Trueelse:self.is_rend = False# saveperiod = 1if episode % period == 0:each_loss /= stepepisode_reward /= periodavg_reward_list.append(episode_reward)loss_list.append(each_loss)print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(episode, num_episode, each_loss, episode_reward, count))# episode_reward = 0# create a new directory for savingpath = PATH + "/" + self.timestamptry:os.makedirs(path)except OSError:pass# saving as timestamp filenp.save(path + "/DOUBLE_DQN_LOSS.npy", loss_list)np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")self.env.close()return loss_list, avg_reward_listdef eval(self):# evaluate the policycount = 0total_reward = 0done = Falsestate = self.test_env.reset()if type(state) == type((1,)):state = state[0]while not done:action = self.choose_our_action(state, epsilon = 1)observation, reward, done, truncated, info = self.test_env.step(action)total_reward += rewardcount += 1state = observationreturn total_reward, count

构建运行函数

超参数可以自己设置 lr gamma

if __name__ == "__main__":# timestampnamed_tuple = time.localtime()time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)print(time_string)# create a doubledqn objectdouble_dqn_object = DOUBLEDQN(env,state_dim=105,action_dim=3,lr=0.001,gamma=0.99,batch_size=64,timestamp=time_string,)# your chosen train timesiteration = 20# start trainingavg_loss, avg_reward_list = double_dqn_object.train(iteration)path = PATH + "/" + time_stringnp.save(path + "/DOUBLE_DQN_LOSS.npy", avg_loss)np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)torch.save(double_dqn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")torch.save(double_dqn_object.state_dict(), path + "/DOUBLE_DQN_MODEL.pt")

使用数据进行绘制图片

新建文件draw_figures.py

?处自己替换成自己的路径

import matplotlib.pyplot as plt
import numpy as np
Loss = r"?\?\DOUBLE_DQN_LOSS.npy"
Reward = r"?\?\DOUBLE_DQN_EACH_REWARD.npy"
avg_loss = np.load(Loss)
avg_reward_list = np.load(Reward)
# print("loss", avg_loss)
# print("reward", avg_reward_list)
plt.figure(figsize=(10, 6))
plt.plot(avg_loss)
plt.grid()
plt.title("Double DQN Loss")
plt.xlabel("epochs")
plt.ylabel("loss")
plt.savefig(r"?\figures\double_dqn_loss.png", dpi=150)
plt.show()plt.figure(figsize=(10, 6))
plt.plot(avg_reward_list)
plt.grid()
plt.title("Double DQN Training Reward")
plt.xlabel("epochs")
plt.ylabel("reward")
plt.savefig(r"?\figures\double_dqn_train_reward.png", dpi=150)
plt.show()

提纳里手动分割线

Dueling_DQN

以上基本稍作改动即可

class Net(nn.Module):def __init__(self, state_dim, action_dim):"""Initialize the network: param state_dim: int, size of state space: param action_dim: int, size of action space"""super(Net, self).__init__()hidden_nodes1 = 1024hidden_nodes2 = 512self.fc1 = nn.Linear(state_dim, hidden_nodes1)self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)self.fc3 = nn.Linear(hidden_nodes2, action_dim + 1)def forward(self, state):"""Define the forward pass of the actor: param state: ndarray, the state of the environment"""x = state# print(x.shape)x = F.relu(self.fc1(x))x = F.relu(self.fc2(x))out = self.fc3(x)return outclass Replay: # learningdef __init__(self,buffer_size, init_length, state_dim, action_dim, env):self.buffer_size = buffer_sizeself.init_length = init_lengthself.state_dim = state_dimself.action_dim = action_dimself.env = envself._storage = []self._init_buffer(init_length)def _init_buffer(self, n):# choose n samples state taken from random actionsstate = self.env.reset()for i in range(n):action = self.env.action_space.sample()observation, reward, done, truncated, info = self.env.step(action)# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit# observation: numpy array [location]# reward: reward for *action# terminated: bool whether end# truncated: bool whether overflow (from done)# info: help/log/informationif type(state) == type((1,)):state = state[0]# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item# because after run env.reset(), the state stores the environmental data and it can not be edited# we only need the state data -- the first ndarrayexp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self._storage.append(exp)state = observationif done:state = self.env.reset()done = Falsedef buffer_add(self, exp):# exp buffer: {exp}=={#                 "state": state,#                 "action": action,#                 "reward": reward,#                 "state_next": observation,#                 "done": terminated,}self._storage.append(exp)if len(self._storage) > self.buffer_size:self._storage.pop(0)  # remove the last one in dictdef buffer_sample(self, n):# random n samples from exp bufferreturn random.sample(self._storage, n)class DUELDQN(nn.Module):def __init__(self,env,state_dim,action_dim,lr=0.001,gamma=0.99,batch_size=5,timestamp="",):""": param env: object, a gym environment: param state_dim: int, size of state space: param action_dim: int, size of action space: param lr: float, learning rate: param gamma: float, discount factor: param batch_size: int, batch size for training"""super(DUELDQN, self).__init__()self.env = envself.env.reset()self.timestamp = timestampself.test_env = copy.deepcopy(env)  # for evaluation purposeself.state_dim = state_dimself.action_dim = action_dimself.gamma = gammaself.batch_size = batch_sizeself.learn_step_counter = 0self.is_rend =Falseself.target_net = Net(self.state_dim, self.action_dim).to(device)self.estimate_net = Net(self.state_dim, self.action_dim).to(device)self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)def choose_action(self, state, epsilon=0.9):# greedy strategy for choosing action# state: ndarray environment state# epsilon: float in [0,1]# return: action we chosen# turn to 1D float tensor -> [[a1,a2,a3,...,an]]# we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model# ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]if type(state) == type((1,)):state = state[0]temp = [exp for exp in state]target = []target = np.array(target)# n dimension to 1 dimension ndarrayfor i in temp:target = np.append(target, i)state = torch.FloatTensor(target).to(device)# randn() return a set of samples which are Gaussian distribution# no argments -> return a float numberif np.random.randn() <= epsilon:# when random number smaller than epsilon: do these things# put a state array into estimate net to obtain their value array# choose max values in value array -> obtain actionaction_value = self.estimate_net(state)action_value = action_value[:-1]action = torch.argmax(action_value).item()else:# when random number bigger than epsilon: randomly choose a actionaction = np.random.randint(0, self.action_dim)return actiondef calculate_duelling_q_values(self, duelling_q_network_output):"""Calculate the Q values using the duelling network architecture. This is equation (9) in the paper.:param duelling_q_network_output: tensor, output of duelling q network:return: Q values"""state_value = duelling_q_network_output[:, -1]avg_advantage = torch.mean(duelling_q_network_output[:, :-1], dim=1)q_values = state_value.unsqueeze(1) + (duelling_q_network_output[:, :-1] - avg_advantage.unsqueeze(1))return q_valuesdef train(self, num_episode):# num_eposide: total turn number for trainloss_list = [] # loss setavg_reward_list = [] # reward setepisode_reward = 0# tqdm : a model for showing process barfor episode in tqdm(range(1,int(num_episode)+1)):done = Falsestate = self.env.reset()each_loss = 0step = 0if type(state) == type((1,)):state = state[0]while not done:if self.is_rend:self.env.render()step += 1action = self.choose_action(state)observation, reward, done, truncated, info = self.env.step(action)exp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self.ReplayBuffer.buffer_add(exp)state = observation# sample random batch in replay memoryexp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)# extract batch dataaction_batch = torch.LongTensor([exp["action"] for exp in exp_batch]).to(device)reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch]).to(device)done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch]).to(device)# Slow method -> Fast method when having more datastate_next_temp = [exp["state_next"] for exp in exp_batch]state_temp = [exp["state"] for exp in exp_batch]state_temp_list = np.array(state_temp)state_next_temp_list = np.array(state_next_temp)state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)state_batch = torch.FloatTensor(state_temp_list).to(device)# reshapestate_batch = state_batch.reshape(self.batch_size, -1)action_batch = action_batch.reshape(self.batch_size, -1)reward_batch = reward_batch.reshape(self.batch_size, -1)state_next_batch = state_next_batch.reshape(self.batch_size, -1)done_batch = done_batch.reshape(self.batch_size, -1)# get estimate Q valueestimate_net_output = self.estimate_net(state_batch)estimate_Q = self.calculate_duelling_q_values(estimate_net_output)estimate_Q = estimate_Q.gather(1, action_batch)# get target Q valuemax_action_idx = (self.estimate_net(state_next_batch)[:, :-1].detach().argmax(1))target_net_output = self.target_net(state_next_batch)target_Q = self.calculate_duelling_q_values(target_net_output).gather(1, max_action_idx.unsqueeze(1))target_Q = reward_batch + done_batch * self.gamma * target_Q# compute mse lossloss = F.mse_loss(estimate_Q, target_Q)each_loss += loss.item()# update networkself.optimizer.zero_grad()loss.backward()self.optimizer.step()# update target networkif self.learn_step_counter % 10 == 0:self.target_net.load_state_dict(self.estimate_net.state_dict())self.learn_step_counter += 1reward, count = self.eval()episode_reward += reward# saveperiod = 1if episode % period == 0:each_loss /= stepepisode_reward /= periodavg_reward_list.append(episode_reward)loss_list.append(each_loss)print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(episode, num_episode, each_loss, episode_reward, count))# epoch_reward = 0path = PATH + "/" + self.timestamp# create a new directory for savingtry:os.makedirs(path)except OSError:passnp.save(path + "/DUELING_DQN_LOSS.npy", loss_list)np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)torch.save(self.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")self.env.close()return loss_list, avg_reward_listdef eval(self):# evaluate the policycount = 0total_reward = 0done = Falsestate = self.test_env.reset()if type(state) == type((1,)):state = state[0]while not done:action = self.choose_action(state, epsilon=1)state_next, reward, done, _, info = self.test_env.step(action)total_reward += rewardcount += 1state = state_nextreturn total_reward, countif __name__ == "__main__":# timestamp for savingnamed_tuple = time.localtime()  # get struct_timetime_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)  # have a folder of "date+time ex: 1209_20_36 -> December 12th, 20:36"duel_dqn_object = DUELDQN(env,state_dim=105,action_dim=3,lr=0.001,gamma=0.99,batch_size=64,timestamp=time_string,)path = PATH + "/" + time_string# Train the policyiterations = 10avg_loss, avg_reward_list = duel_dqn_object.train(iterations)np.save(path + "/DUELING_DQN_LOSS.npy", avg_loss)np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)torch.save(duel_dqn_object.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")torch.save(duel_dqn_object.state_dict(), path + "/DUELING_DQN_MODEL.pt")

DDQN+OtherChanges

三层2D卷积

# add CNN structure
class Net(nn.Module):def __init__(self, state_dim, action_dim):# initalize the network# state_dim: state space# action_dim: action spacesuper(Net, self).__init__()# nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)# in_channel : input size = in_channels * in_N * in_N# out_channel : define# kernel_size : rules or define# stride: step length# padding: padding size# out_N = (in_N - Kernel_size + 2 * Padding)/ Stride +1self.cnn = nn.Sequential(# the first 2D convolutional layernn.Conv2d(1, 4, kernel_size=3, padding=1),nn.BatchNorm2d(4),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=3, stride=1),# the second 2D convolutional layernn.Conv2d(4, 8, kernel_size=3, padding=1),nn.BatchNorm2d(8),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=3, stride=1),# the third 2D convolutional layer ---- my test and try or more convolutional layersnn.Conv2d(8, 4, kernel_size=3, padding=1),nn.BatchNorm2d(4),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=3, stride=1),)hidden_nodes1 = 1024hidden_nodes2 = 512self.fc1 = nn.Linear(4 * 1 * 9, hidden_nodes1)self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)self.fc3 = nn.Linear(hidden_nodes2, action_dim)def forward(self, state):# define forward pass of the actorx = state # statex = self.cnn(x)x = x.view(x.size(0), -1)# Relu function doublex = F.relu(self.fc1(x))x = F.relu(self.fc2(x))out = self.fc3(x)return outclass Replay:def __init__(self, buffer_size, init_length, state_dim, action_dim, env):self.buffer_size = buffer_sizeself.init_length = init_lengthself.state_dim = state_dimself.action_dim = action_dimself.env = envself._storage = []self._init_buffer(init_length)def _init_buffer(self, n):# choose n samples state taken from random actionsstate = self.env.reset()for i in range(n):action = self.env.action_space.sample()observation, reward, done, truncated, info = self.env.step(action)# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit# observation: numpy array [location]# reward: reward for *action# terminated: bool whether end# truncated: bool whether overflow (from done)# info: help/log/informationif type(state) == type((1,)):state = state[0]# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item# because after run env.reset(), the state stores the environmental data and it can not be edited# we only need the state data -- the first ndarrayexp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self._storage.append(exp)state = observationif done:state = self.env.reset()done = Falsedef buffer_add(self, exp):# exp buffer: {exp}=={#                 "state": state,#                 "action": action,#                 "reward": reward,#                 "state_next": observation,#                 "done": terminated,}self._storage.append(exp)if len(self._storage) > self.buffer_size:self._storage.pop(0) # remove the last one in dictdef buffer_sample(self, N):# random n samples from exp bufferreturn random.sample(self._storage, N)class DOUBLEDQN_CNN(nn.Module):def __init__(self,env,  # gym environmentstate_dim,  # state sizeaction_dim,  # action sizelr=0.001,  # learning rategamma=0.99,  # discount factorbatch_size=5,  # batch size for each trainingtimestamp="", ):# super classsuper(DOUBLEDQN_CNN, self).__init__()self.env = envself.env.reset()self.timestamp = timestamp# for evaluation purposeself.test_env = copy.deepcopy(env)self.state_dim = state_dimself.action_dim = action_dimself.gamma = gammaself.batch_size = batch_sizeself.learn_step_counter = 0self.is_rend = Falseself.target_net = Net(self.state_dim, self.action_dim).to(device)self.estimate_net = Net(self.state_dim, self.action_dim).to(device)self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)def choose_action(self, state, epsilon=0.9):# greedy strategy for choosing action# state: ndarray environment state# epsilon: float in [0,1]# return: action we chosen# turn to 1D float tensor -> [[a1,a2,a3,...,an]]# we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model# ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]if type(state) == type((1,)):state = state[0]#TODOstate = (torch.FloatTensor(state).to(device).reshape(-1, 1, 7, self.state_dim // 7))if np.random.randn() <= epsilon:action_value = self.estimate_net(state)action = torch.argmax(action_value).item()else:action = np.random.randint(0, self.action_dim)return actiondef train(self, num_episode):# num_eposide: total turn number for traincount_list = []loss_list = []total_reward_list = []avg_reward_list = []episode_reward = 0rend = 0for episode in tqdm(range(1,int(num_episode)+1)):done = Falsestate = self.env.reset()each_loss = 0step = 0if type(state) == type((1,)):state = state[0]while not done:if self.is_rend:self.env.render()step += 1action = self.choose_action(state)observation, reward, done, truncated, info = self.env.step(action)exp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self.ReplayBuffer.buffer_add(exp)state = observation# sample random batch from replay memoryexp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)# extract batch dataaction_batch = torch.LongTensor([exp["action"] for exp in exp_batch])reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch])done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch])# Slow method -> Fast method when having more datastate_next_temp = [exp["state_next"] for exp in exp_batch]state_temp = [exp["state"] for exp in exp_batch]state_temp_list = np.array(state_temp)state_next_temp_list = np.array(state_next_temp)state_next_batch = torch.FloatTensor(state_next_temp_list)state_batch = torch.FloatTensor(state_temp_list)# reshapestate_batch = state_batch.to(device).reshape(self.batch_size, 1, 7, self.state_dim // 7)action_batch = action_batch.to(device).reshape(self.batch_size, -1)reward_batch = reward_batch.to(device).reshape(self.batch_size, -1)state_next_batch = state_next_batch.to(device).reshape(self.batch_size, 1, 7, self.state_dim // 7)done_batch = done_batch.to(device).reshape(self.batch_size, -1)# get estimate Q valueestimate_Q = self.estimate_net(state_batch).gather(1, action_batch)# get target Q valuemax_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)target_Q = reward_batch + done_batch * self.gamma * self.target_net(state_next_batch).gather(1, max_action_idx.unsqueeze(1))# compute mse lossloss = F.mse_loss(estimate_Q, target_Q)each_loss += loss.item()# update networkself.optimizer.zero_grad()loss.backward()self.optimizer.step()# update target networkif self.learn_step_counter % 10 == 0:self.target_net.load_state_dict(self.estimate_net.state_dict())self.learn_step_counter += 1reward, count = self.eval()episode_reward += reward# you can update these variablesif episode_reward % 100 == 0:rend += 1if rend % 5 == 0:self.is_rend = Trueelse:self.is_rend = False# saveperiod = 1if episode % period == 0:each_loss /= stepepisode_reward /= periodavg_reward_list.append(episode_reward)loss_list.append(each_loss)print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(episode, num_episode, each_loss, episode_reward, count))# epoch_reward = 0# create a new directory for savingpath = PATH + "/" + self.timestamptry:os.makedirs(path)except OSError:pass# saving as timestamp filenp.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", loss_list)np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")self.env.close()return loss_list, avg_reward_listdef eval(self):# evaluate the policycount = 0total_reward = 0done = Falsestate = self.test_env.reset()if type(state) == type((1,)):state = state[0]while not done:action = self.choose_action(state, epsilon=1)observation, reward, done, truncated, info = self.test_env.step(action)total_reward += rewardcount += 1state = observationreturn total_reward, countif __name__ == "__main__":# timestampnamed_tuple = time.localtime()time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)print(time_string)# create a doubledqn objectdouble_dqn_cnn_object = DOUBLEDQN_CNN(env,state_dim=105,action_dim=3,lr=0.001,gamma=0.99,batch_size=64,timestamp=time_string,)# your chosen train timesiteration = 20# start trainingavg_loss, avg_reward_list = double_dqn_cnn_object.train(iteration)path = PATH + "/" + time_stringnp.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", avg_loss)np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)torch.save(double_dqn_cnn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")torch.save(double_dqn_cnn_object.state_dict(), path + "/DOUBLE_DQN_CNN_MODEL.pt")

经验池

class Net(nn.Module):def __init__(self, state_dim, action_dim):# state_dim: state space# action_dim: action spacesuper(Net, self).__init__()hidden_nodes1 = 1024hidden_nodes2 = 512self.fc1 = nn.Linear(state_dim, hidden_nodes1)self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)self.fc3 = nn.Linear(hidden_nodes2, action_dim)def forward(self, state):# state: ndarrayx = statex = F.relu(self.fc1(x))x = F.relu(self.fc2(x))out = self.fc3(x)return out# Priortized_Replay
class Prioritized_Replay:def __init__(self,buffer_size,init_length,state_dim,action_dim,est_Net,tar_Net,gamma,):# state_dim: state space# action_dim: action space# env: envself.buffer_size = buffer_sizeself.init_length = init_lengthself.state_dim = state_dimself.action_dim = action_dimself.gamma = gammaself.is_rend = Falseself.priority = deque(maxlen=buffer_size)self._storage = []self._init_buffer(init_length, est_Net, tar_Net)def _init_buffer(self, n, est_Net, tar_Net):# n: sample numberstate = env.reset()for i in range(n):action = env.action_space.sample()observation, reward, done, truncated, info = env.step(action)# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit# observation: numpy array [location]# reward: reward for *action# terminated: bool whether end# truncated: bool whether overflow (from done)# info: help/log/informationif type(state) == type((1,)):state = state[0]# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item# because after run env.reset(), the state stores the environmental data and it can not be edited# we only need the state data -- the first ndarrayexp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self.prioritize(est_Net, tar_Net, exp, alpha=0.6)self._storage.append(exp)state = observationif done:state = env.reset()done = Falsedef buffer_add(self, exp):# exp buffer: {exp}=={#                 "state": state,#                 "action": action,#                 "reward": reward,#                 "state_next": observation,#                 "done": terminated,}self._storage.append(exp)if len(self._storage) > self.buffer_size:self._storage.pop(0)# add prioritizedef prioritize(self, est_Net, tar_Net, exp, alpha=0.6):state = torch.FloatTensor(exp["state"]).to(device).reshape(-1)q = est_Net(state)[exp["action"]].detach().cpu().numpy()q_next = exp["reward"] + self.gamma * torch.max(est_Net(state).detach())# TD errorp = (np.abs(q_next.cpu().numpy() - q) + (np.e ** -10)) ** alphaself.priority.append(p.item())def get_prioritized_batch(self, N):prob = self.priority / np.sum(self.priority)# random.choices(list,weights=None,*,cum_weights=None,k=1)# weight: set the chosen item rate# k: times for choice# cum_weight: sum of weightsample_idxes = random.choices(range(len(prob)), k=N, weights=prob)importance = (1 / prob) * (1 / len(self.priority))sampled_importance = np.array(importance)[sample_idxes]sampled_batch = np.array(self._storage)[sample_idxes]return sampled_batch.tolist(), sampled_importancedef buffer_sample(self, N):# random n samples from exp bufferreturn random.sample(self._storage, N)class DDQNPB(nn.Module):def __init__(self,env,state_dim,action_dim,lr=0.001,gamma=0.99,buffer_size=1000,batch_size=50,beta=1,beta_decay=0.995,beta_min=0.01,timestamp="",):# env: environment# state_dim: state space# action_dim: action space# lr: learning rate# gamma: loss/discount factor# batch_size: training batch sizesuper(DDQNPB, self).__init__()self.timestamp = timestampself.test_env = copy.deepcopy(env)  # for evaluation purposeself.state_dim = state_dimself.action_dim = action_dimself.gamma = gammaself.batch_size = batch_sizeself.learn_step_counter = 0self.target_net = Net(self.state_dim, self.action_dim).to(device)self.estimate_net = Net(self.state_dim, self.action_dim).to(device)self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)self.ReplayBuffer = Prioritized_Replay(buffer_size,100,self.state_dim,self.action_dim,self.estimate_net,self.target_net,gamma,)self.priority = self.ReplayBuffer.priority# NOTE: right here beta is equal to (1-beta) in most of website articles, notation difference# start from 1 and decayself.beta = betaself.beta_decay = beta_decayself.beta_min = beta_mindef choose_action(self, state, epsilon=0.9):# state: env state# epsilon: [0,1]# return action you choose# get a 1D arrayif type(state) == type((1,)):state = state[0]temp = [exp for exp in state]target = []target = np.array(target)# n dimension to 1 dimension ndarrayfor i in temp:target = np.append(target, i)state = torch.FloatTensor(target).to(device)if np.random.randn() <= epsilon:action_value = self.estimate_net(state)action = torch.argmax(action_value).item()else:action = np.random.randint(0, self.action_dim)return actiondef train(self, num_episode):# num_epochs: training timesloss_list = []avg_reward_list = []episode_reward = 0for episode in tqdm(range(1,int(num_episode)+1)):done = Falsestate = env.reset()each_loss = 0step = 0rend = 0if type(state) == type((1,)):state = state[0]while not done:action = self.choose_action(state)observation, reward, done, _, info = env.step(action)# self.env.render()# store experience to replay memoryexp = {"state": state,"action": action,"reward": reward,"state_next": observation,"done": done,}self.ReplayBuffer.buffer_add(exp)state = observation# importance weightingif self.beta > self.beta_min:self.beta *= self.beta_decay# sample random batch from replay memoryexp_batch, importance = self.ReplayBuffer.get_prioritized_batch(self.batch_size)importance = torch.FloatTensor(importance ** (1 - self.beta)).to(device)# extract batch dataaction_batch = torch.LongTensor([exp["action"] for exp in exp_batch]).to(device)reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch]).to(device)done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch]).to(device)# Slow method -> Fast method when having more datastate_next_temp = [exp["state_next"] for exp in exp_batch]state_temp = [exp["state"] for exp in exp_batch]state_temp_list = np.array(state_temp)state_next_temp_list = np.array(state_next_temp)state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)state_batch = torch.FloatTensor(state_temp_list).to(device)# reshapestate_batch = state_batch.reshape(self.batch_size, -1)action_batch = action_batch.reshape(self.batch_size, -1)reward_batch = reward_batch.reshape(self.batch_size, -1)state_next_batch = state_next_batch.reshape(self.batch_size, -1)done_batch = done_batch.reshape(self.batch_size, -1)# get estimate Q valueestimate_Q = self.estimate_net(state_batch).gather(1, action_batch)# get target Q valuemax_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)target_Q = reward_batch + done_batch * self.gamma * self.target_net(state_next_batch).gather(1, max_action_idx.unsqueeze(1))# compute mse loss# loss = F.mse_loss(estimate_Q, target_Q)loss = torch.mean(torch.multiply(torch.square(estimate_Q - target_Q), importance))each_loss += loss.item()# update networkself.optimizer.zero_grad()loss.backward()self.optimizer.step()#TODO# update target networkif self.learn_step_counter % 10 == 0:# self.update_target_networks()self.target_net.load_state_dict(self.estimate_net.state_dict())self.learn_step_counter += 1step += 1env.render()# you can update these variables# if episode_reward % 100 == 0:#     rend += 1#     if rend % 5 == 0:#         self.is_rend = True#     else:#         self.is_rend = Falsereward, count = self.eval()episode_reward += reward# saveperiod = 1if episode % period == 0:# logeach_loss /= periodepisode_reward /= periodavg_reward_list.append(episode_reward)loss_list.append(each_loss)print("\nepoch: [{}/{}], \tavg loss: {:.4f}, \tavg reward: {:.3f}, \tsteps: {}".format(episode, num_episode, each_loss, episode_reward, count))# episode_reward = 0# create a new directory for savingpath = PATH + "/" + self.timestamptry:os.makedirs(path)except OSError:passnp.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", loss_list)np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)torch.save(self.estimate_net.state_dict(),path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")env.close()return loss_list, avg_reward_listdef eval(self):"""Evaluate the policy"""count = 0total_reward = 0done = Falsestate = self.test_env.reset()if type(state) == type((1,)):state = state[0]while not done:action = self.choose_action(state, epsilon=1)observation, reward, done, truncated, info = self.test_env.step(action)total_reward += rewardcount += 1state = observationreturn total_reward, countif __name__ == "__main__":# timestamp for savingnamed_tuple = time.localtime()  # get struct_timetime_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)double_dqn_prioritized_object = DDQNPB(env,state_dim=105,action_dim=3,lr=0.001,gamma=0.99,buffer_size=1000,batch_size=64,timestamp=time_string,)# Train the policyiterations = 10000avg_loss, avg_reward_list = double_dqn_prioritized_object.train(iterations)path = PATH + "/" + time_stringnp.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", avg_loss)np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)torch.save(double_dqn_prioritized_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")torch.save(double_dqn_prioritized_object.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_MODEL.pt")

有些东西可以自己改掉,自己调出的bug才是好bug!(大雾)

写在后面:

关于自定义环境,刚刚花30分钟研究了一下,官方写的教程稀烂(狗头),我自己得到的攻略如下:

  1. 找到你的highway-env安装包位置,我的在:E:\formalFiles\Anaconda3-2020.07\envs\autodrive_38\Lib\site-packages\highway_env
  2. 在highway-env里的envs可以看到多个场景的定义文件,此处拿出intersection_env.py举例,其他的同理。新建一个文件test_env.py,把intersection_env.py的所有内容复制粘贴到里面。

  3. 在test_env.py里,重命名如下:
    class test(AbstractEnv):## ACTIONS: Dict[int, str] = {#     0: 'SLOWER',#     1: 'IDLE',#     2: 'FASTER'# }ACTIONS: Dict[int, str] = {0: 'LANE_LEFT',1: 'IDLE',2: 'LANE_RIGHT',3: 'FASTER',4: 'SLOWER'}

    删除除了第一个class以外的所有class定义。这里是把动作区间改成5个。

  4. 在envs/_init_.py的末尾加上
    from highway_env.envs.test_env import *
  5. 在highway-env文件夹里找到一个单独的_init_.py,不是上一步的python文件!修改如下:
    def register_highway_envs():"""Import the envs module so that envs register themselves."""# my test environmentregister(id='test-v0',# 引用名entry_point='highway_env.envs:test'#环境类名)
  6. 修改奖励,来到你的定义环境文件highway-env/envs/test_env.py里面,看到_reward函数,以及和它有关的_agent_reward函数等,可自行改掉算子。utils.py中有函数lmap()。
    def _reward(self, action: int) -> float:"""Aggregated reward, for cooperative agents."""return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles) / len(self.controlled_vehicles)def _agent_reward(self, action: int, vehicle: Vehicle) -> float:"""Per-agent reward signal."""rewards = self._agent_rewards(action, vehicle)reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())reward = self.config["arrived_reward"] if rewards["arrived_reward"] else rewardreward *= rewards["on_road_reward"]if self.config["normalize_reward"]:reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])return rewarddef _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:"""Per-agent per-objective reward signal."""scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])return {"collision_reward": vehicle.crashed,"high_speed_reward": np.clip(scaled_speed, 0, 1),"arrived_reward": self.has_arrived(vehicle),"on_road_reward": vehicle.on_road}
  7. 引用自定义环境如下:
    import highway-env
    import gymenv = gym.make("test-v0")
    env.reset()
  8. 我自定义的环境文件,个人设定,不代表最佳结果:
    from typing import Dict, Tuple, Textimport numpy as np
    from highway_env import utils
    from highway_env.envs.common.abstract import AbstractEnv, MultiAgentWrapper
    from highway_env.road.lane import LineType, StraightLane, CircularLane, AbstractLane
    from highway_env.road.regulation import RegulatedRoad
    from highway_env.road.road import RoadNetwork
    from highway_env.vehicle.kinematics import Vehicle
    from highway_env.vehicle.controller import ControlledVehicleclass test(AbstractEnv):## ACTIONS: Dict[int, str] = {#     0: 'SLOWER',#     1: 'IDLE',#     2: 'FASTER'# }ACTIONS: Dict[int, str] = {0: 'LANE_LEFT',1: 'IDLE',2: 'LANE_RIGHT',3: 'FASTER',4: 'SLOWER'}ACTIONS_INDEXES = {v: k for k, v in ACTIONS.items()}@classmethoddef default_config(cls) -> dict:config = super().default_config()config.update({"observation": {"type": "Kinematics","vehicles_count": 15,"features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],"features_range": {"x": [-100, 100],"y": [-100, 100],"vx": [-20, 20],"vy": [-20, 20],},"absolute": True,"flatten": False,"observe_intentions": False},"action": {"type": "DiscreteMetaAction","longitudinal": True,"lateral": True,"target_speeds": [0, 4.5, 9]},"duration": 13,  # [s]"destination": "o1","controlled_vehicles": 1,"initial_vehicle_count": 10,"spawn_probability": 0.6,"screen_width": 600,"screen_height": 600,"centering_position": [0.5, 0.6],"scaling": 5.5 * 1.3,"collision_reward": -10,"high_speed_reward": 2,"arrived_reward": 5,"reward_speed_range": [7.0, 9.0],# change"normalize_reward": False,"offroad_terminal": False})return configdef _reward(self, action: int) -> float:"""Aggregated reward, for cooperative agents."""return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles) / len(self.controlled_vehicles)def _rewards(self, action: int) -> Dict[Text, float]:"""Multi-objective rewards, for cooperative agents."""agents_rewards = [self._agent_rewards(action, vehicle) for vehicle in self.controlled_vehicles]return {name: sum(agent_rewards[name] for agent_rewards in agents_rewards) / len(agents_rewards)for name in agents_rewards[0].keys()}# edit your rewarddef _agent_reward(self, action: int, vehicle: Vehicle) -> float:"""Per-agent reward signal."""rewards = self._agent_rewards(action, vehicle)reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())reward = self.config["arrived_reward"] if rewards["arrived_reward"] else rewardreward *= rewards["on_road_reward"]if self.config["normalize_reward"]:reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])return rewarddef _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:"""Per-agent per-objective reward signal."""scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])return {"collision_reward": vehicle.crashed,"high_speed_reward": np.clip(scaled_speed, 0, 1),"arrived_reward": self.has_arrived(vehicle),"on_road_reward": vehicle.on_road}def _is_terminated(self) -> bool:return any(vehicle.crashed for vehicle in self.controlled_vehicles) \or all(self.has_arrived(vehicle) for vehicle in self.controlled_vehicles) \or (self.config["offroad_terminal"] and not self.vehicle.on_road)def _agent_is_terminal(self, vehicle: Vehicle) -> bool:"""The episode is over when a collision occurs or when the access ramp has been passed."""return (vehicle.crashed orself.has_arrived(vehicle) orself.time >= self.config["duration"])def _is_truncated(self) -> bool:returndef _info(self, obs: np.ndarray, action: int) -> dict:info = super()._info(obs, action)info["agents_rewards"] = tuple(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles)info["agents_dones"] = tuple(self._agent_is_terminal(vehicle) for vehicle in self.controlled_vehicles)return infodef _reset(self) -> None:self._make_road()self._make_vehicles(self.config["initial_vehicle_count"])def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]:obs, reward, terminated, truncated, info = super().step(action)self._clear_vehicles()self._spawn_vehicle(spawn_probability=self.config["spawn_probability"])return obs, reward, terminated, truncated, infodef _make_road(self) -> None:"""Make an 4-way intersection.The horizontal road has the right of way. More precisely, the levels of priority are:- 3 for horizontal straight lanes and right-turns- 1 for vertical straight lanes and right-turns- 2 for horizontal left-turns- 0 for vertical left-turnsThe code for nodes in the road network is:(o:outer | i:inner + [r:right, l:left]) + (0:south | 1:west | 2:north | 3:east):return: the intersection road"""lane_width = AbstractLane.DEFAULT_WIDTHright_turn_radius = lane_width + 5  # [m}left_turn_radius = right_turn_radius + lane_width  # [m}outer_distance = right_turn_radius + lane_width / 2access_length = 50 + 50  # [m]net = RoadNetwork()n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPEDfor corner in range(4):angle = np.radians(90 * corner)is_horizontal = corner % 2priority = 3 if is_horizontal else 1rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])# Incomingstart = rotation @ np.array([lane_width / 2, access_length + outer_distance])end = rotation @ np.array([lane_width / 2, outer_distance])net.add_lane("o" + str(corner), "ir" + str(corner),StraightLane(start, end, line_types=[s, c], priority=priority, speed_limit=10))# Right turnr_center = rotation @ (np.array([outer_distance, outer_distance]))net.add_lane("ir" + str(corner), "il" + str((corner - 1) % 4),CircularLane(r_center, right_turn_radius, angle + np.radians(180), angle + np.radians(270),line_types=[n, c], priority=priority, speed_limit=10))# Left turnl_center = rotation @ (np.array([-left_turn_radius + lane_width / 2, left_turn_radius - lane_width / 2]))net.add_lane("ir" + str(corner), "il" + str((corner + 1) % 4),CircularLane(l_center, left_turn_radius, angle + np.radians(0), angle + np.radians(-90),clockwise=False, line_types=[n, n], priority=priority - 1, speed_limit=10))# Straightstart = rotation @ np.array([lane_width / 2, outer_distance])end = rotation @ np.array([lane_width / 2, -outer_distance])net.add_lane("ir" + str(corner), "il" + str((corner + 2) % 4),StraightLane(start, end, line_types=[s, n], priority=priority, speed_limit=10))# Exitstart = rotation @ np.flip([lane_width / 2, access_length + outer_distance], axis=0)end = rotation @ np.flip([lane_width / 2, outer_distance], axis=0)net.add_lane("il" + str((corner - 1) % 4), "o" + str((corner - 1) % 4),StraightLane(end, start, line_types=[n, c], priority=priority, speed_limit=10))road = RegulatedRoad(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])self.road = roaddef _make_vehicles(self, n_vehicles: int = 10) -> None:"""Populate a road with several vehicles on the highway and on the merging lane:return: the ego-vehicle"""# Configure vehiclesvehicle_type = utils.class_from_path(self.config["other_vehicles_type"])vehicle_type.DISTANCE_WANTED = 5  # Low jam distancevehicle_type.COMFORT_ACC_MAX = 6vehicle_type.COMFORT_ACC_MIN = -3# Random vehiclessimulation_steps = 3for t in range(n_vehicles - 1):self._spawn_vehicle(np.linspace(0, 80, n_vehicles)[t])for _ in range(simulation_steps):[(self.road.act(), self.road.step(1 / self.config["simulation_frequency"])) for _ in range(self.config["simulation_frequency"])]# Challenger vehicleself._spawn_vehicle(60, spawn_probability=1, go_straight=True, position_deviation=0.1, speed_deviation=0)# Controlled vehiclesself.controlled_vehicles = []for ego_id in range(0, self.config["controlled_vehicles"]):ego_lane = self.road.network.get_lane(("o{}".format(ego_id % 4), "ir{}".format(ego_id % 4), 0))destination = self.config["destination"] or "o" + str(self.np_random.randint(1, 4))ego_vehicle = self.action_type.vehicle_class(self.road,ego_lane.position(60 + 5*self.np_random.normal(1), 0),speed=ego_lane.speed_limit,heading=ego_lane.heading_at(60))try:ego_vehicle.plan_route_to(destination)ego_vehicle.speed_index = ego_vehicle.speed_to_index(ego_lane.speed_limit)ego_vehicle.target_speed = ego_vehicle.index_to_speed(ego_vehicle.speed_index)except AttributeError:passself.road.vehicles.append(ego_vehicle)self.controlled_vehicles.append(ego_vehicle)for v in self.road.vehicles:  # Prevent early collisionsif v is not ego_vehicle and np.linalg.norm(v.position - ego_vehicle.position) < 20:self.road.vehicles.remove(v)def _spawn_vehicle(self,longitudinal: float = 0,position_deviation: float = 1.,speed_deviation: float = 1.,spawn_probability: float = 0.6,go_straight: bool = False) -> None:if self.np_random.uniform() > spawn_probability:returnroute = self.np_random.choice(range(4), size=2, replace=False)route[1] = (route[0] + 2) % 4 if go_straight else route[1]vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])vehicle = vehicle_type.make_on_lane(self.road, ("o" + str(route[0]), "ir" + str(route[0]), 0),longitudinal=(longitudinal + 5+ self.np_random.normal() * position_deviation),speed=8 + self.np_random.normal() * speed_deviation)for v in self.road.vehicles:if np.linalg.norm(v.position - vehicle.position) < 15:returnvehicle.plan_route_to("o" + str(route[1]))vehicle.randomize_behavior()self.road.vehicles.append(vehicle)return vehicledef _clear_vehicles(self) -> None:is_leaving = lambda vehicle: "il" in vehicle.lane_index[0] and "o" in vehicle.lane_index[1] \and vehicle.lane.local_coordinates(vehicle.position)[0] \>= vehicle.lane.length - 4 * vehicle.LENGTHself.road.vehicles = [vehicle for vehicle in self.road.vehicles ifvehicle in self.controlled_vehicles or not (is_leaving(vehicle) or vehicle.route is None)]def has_arrived(self, vehicle: Vehicle, exit_distance: float = 25) -> bool:return "il" in vehicle.lane_index[0] \and "o" in vehicle.lane_index[1] \and vehicle.lane.local_coordinates(vehicle.position)[0] >= exit_distance

哦,都要一个可视化是吧?来了来了。

在test-v0下,用double_dqn.py训练的图:(action_dim==5)

目前是单智能体,后续的多智能体需要调整输入的数据和动作,以及控制小车的数量,做为后续的待定改进点。

其他?等我写好 多智能体 0-0!

待好心人补充....毕竟这里是无人区啊(悲)

终有一日,我会成为神一样的提纳里先生!

2023年highway-env更新之后的使用记录(含DDQN,DuelingDQN,DDQN+OtherChanges) 入门到入土,再踩坑就不玩原神了相关推荐

  1. 红魔系统服务器更新错误,红魔5g玩原神为什么玩不了

    1 回答 为什么地牢猎手5玩不了? 网络连接失败:检查一下自己的网络,如果不能正常联网,把网络重新连接好即可:||服务器正在维护:等待服务器维修结束即可:||安装包错误:安装包错误需要玩家卸载游戏后, ...

  2. Windows 7 Windows Server 2008 R2 简体中文版下载 (2023 年 3 月更新)

    Windows 7 & Windows Server 2008 R2 (updated Mar 2023) 请访问原文链接:https://sysin.org/blog/windows-7/, ...

  3. 使用 Access SQL 插入、更新和删除表格记录

    参考链接 使用 Access SQL 插入.更新和删除表格记录 | Microsoft Learnhttps://learn.microsoft.com/zh-cn/office/vba/access ...

  4. 原神服务器维护后抽奖池会更新吗,原神:武器池改动,玩家的诉求再次得到反馈!PS端将与官服互通!...

    大伙直播都看了吗?反正我看完了. 现在满脑子都是大伟哥的嗯典. 这好吗?这不好.但是没有关系,内容还是有的. 首先,剧情上的雷神确立了,就是这位大姐.(好像有什么锋利的东西悬在了我的头顶,不过这上面, ...

  5. 原神服务器维护后抽奖池会更新吗,原神:更新维护一小时,补偿60原石,玩家祈求多维护几天!...

    10月21号,原神社区发布公告,游戏将会在10月22号7点至11点进行停服维护,所有玩家在这个时间段将无法进入游戏.而作为补偿,官方会赠送5级以上的玩家240原石(停服一小时送60原石).这是偷偷的更 ...

  6. 加入域时出现以下错误 登陆失败 该目标账户名称不正确_微信支付踩坑合集:微信小程序支付失败是什么原因?持续更新...

    微信小程序开发的过程一定会遇到各种问题,最让人棘手的就是支付问题,因为没有支付做商城类似的小程序就没有办法完成最关键的一步.那么支付失败到底什么原因呢?一下子收集了几个错误类似,希望对你有帮助: No ...

  7. 【威联通QNAP】TS-216折腾踩坑记录(更新于22.11.22)

    本篇博客记录了我使用威联通ts216的折腾记录.nas购入于2022年双11,京东自营 关于系统初始化等内容不做记录,这部分网上可以找到很多教程,只要是威联通,方法都是一样的 初始化参考视频 [村雨] ...

  8. AirSim学习和踩坑记录(不定时更新)

    版权声明:本文为博主原创文章,遵循Creative Commons - Attribution-ShareAlike 4.0 International - CC BY-SA 4.0版权协议,转载请附 ...

  9. 原神服务器维护后抽奖池会更新吗,原神换up池会刷新保底吗

    1 回答 原神新手池和up池共享保底吗? 首先在UP卡池的规则中,官方已经提及了保底共享机制:UP卡池的保底次数,和其他(常驻)卡池的保底次数[相互独立计算],互不影响:因此可以得出结论:常驻池和UP ...

最新文章

  1. SQLDMO- (数据备份与恢复篇)
  2. 干货|利用卷积自编码器对图片进行降噪
  3. 解决Android Studio报错:DefaultAndroidProject : Unsupported major.minor version 52.0
  4. 配置MM模块material management几个常见的错误
  5. qq机器人自动回复带脚本可以实现吗_有人用Python写了个自动亏钱脚本,还能微信实时通知!
  6. 06列表的常用基本操作
  7. div为空的时候 浮动没有效果_3种CSS清除浮动的方法
  8. 栅栏密码怎么写java程序_简单栅栏密码加密器的JSF版
  9. 埃博拉疫情蔓延在进行中
  10. Atitit 数据库与存储引擎设计与实现 attilax总结 1.1. 数据库的实现有很多种, 遵循一些理论规范,如 Fix Rules、Write-Ahead Log、Force-log-at-
  11. angularjs 笔记(1) -- 引导
  12. 分享一个游戏源码、游戏服务端、下载网站
  13. CRC冗余校验的真正原理之模2除法
  14. 获取URL中的一级域名
  15. 2教务管理系统 / 选课管理web
  16. 软件需求规格说明书样例
  17. 计算机控制独立点火结构图,独立点火线圈检测方法 独立点火线圈结构线路图...
  18. Matlab实现鼠标拖动显示轮廓线
  19. Java通过JNI调用C++动态链接库dll,并打在jar包内 ——JNA-JNI(一)
  20. 详解CAN总线:CAN总线报文格式—帧间隔

热门文章

  1. Docker学习笔记2——Docker组件(幕布笔记)
  2. python中的matplotlib.pyplot_Python matplotlib简介 Pyplot教程
  3. java跳出递归_在Java项目中如何跳出递归循环
  4. 逆向入门分析实战(三)
  5. 2015武汉大学第八届Eming杯现场赛E题题解
  6. 聚通达科技java面试_面试官就这么欺负人:new Object()到底占用几个字节?
  7. 智慧农贸市场管理系统
  8. 暨大计算机考研要求高不高,暨南大学考研难吗?一般要什么水平才可以进入?...
  9. 2023中国大学排名,软科发布
  10. Linux云计算学习笔记day52