#########################################################
import torch
import torch.nn as nn
import torch.nn.functional as F
from numpy import *
import numpy as np
import time
import matplotlib.pyplot as plt
import math
import os
from tensorboardX import SummaryWriter
# #################### hyper parameters ####################
LR_ACTOR = 0.001
LR_CRITIC = 0.002
GAMMA = 0.99
TAU = 0.001
MEMORY_CAPACITY = 10000
BATCH_SIZE = 128
s_dim = 5
a_dim = 3
# max_action = 10 # action range is [-10, 10]
# action_low = -max_action
# action_high = max_action
NN_nodes = 30
log_interval = 50
################################################################
# device = 'cuda' if torch.cuda.is_available() else 'cpu'
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
script_name = os.path.basename(__file__)
directory = './exp' + script_name + './'
########################## DDPG Framework ######################
class ActorNet(nn.Module): # define the network structure for actor and critic
def __init__(self, s_dim, a_dim):
super(ActorNet, self).__init__()
self.fc1 = nn.Linear(s_dim, NN_nodes)
self.fc1.weight.data.normal_(0, 0.1) # initialization of FC1
self.out = nn.Linear(NN_nodes, a_dim)
self.out.weight.data.normal_(0, 0.1) # initilizaiton of OUT
self.max_action = max_action
def forward(self, x):
x = self.fc1(x)
x = F.relu(x)
x = self.out(x)
x = torch.tanh(x)
actions = x * self.max_action
return actions
class CriticNet(nn.Module):
def __init__(self, s_dim, a_dim):
super(CriticNet, self).__init__()
self.fcs = nn.Linear(s_dim, NN_nodes)
self.fcs.weight.data.normal_(0, 0.1)
self.fca = nn.Linear(a_dim, NN_nodes)
self.fca.weight.data.normal_(0, 0.1)
self.out = nn.Linear(NN_nodes, 1)
self.out.weight.data.normal_(0, 0.1)
def forward(self, s, a):
x = self.fcs(s)
y = self.fca(a)
actions_value = self.out(F.relu(x + y))
return actions_value
class DDPG(object):
def __init__(self, a_dim, s_dim, action_high):
self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, action_high
self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)
self.pointer = 0 # serves as updating the memory data
# Create the 4 network objects
self.actor_eval = ActorNet(s_dim, a_dim).to(device)
self.actor_target = ActorNet(s_dim, a_dim).to(device)
self.actor_target.load_state_dict(self.actor_eval.state_dict())
self.critic_eval = CriticNet(s_dim, a_dim).to(device)
self.critic_target = CriticNet(s_dim, a_dim).to(device)
self.critic_target.load_state_dict(self.critic_eval.state_dict())
# create 2 optimizers for actor and critic
self.actor_optimizer = torch.optim.Adam(self.actor_eval.parameters(), lr=LR_ACTOR)
self.critic_optimizer = torch.optim.Adam(self.critic_eval.parameters(), lr=LR_CRITIC)
# Define the loss function for critic network update
self.loss_func = nn.MSELoss().to(device)
self.writer = SummaryWriter(directory)
self.num_critic_update_iteration = 0
self.num_actor_update_iteration = 0
def store_transition(self, s, a, r, s_): # how to store the episodic data to buffer
transition = np.hstack((s, a, [r], s_))
index = self.pointer % MEMORY_CAPACITY # replace the old data with new data
self.memory[index, :] = transition
self.pointer += 1
def choose_action(self, s):
# print(s)
s = torch.unsqueeze(torch.FloatTensor(s), 0).to(device)
return self.actor_eval(s)[0].cpu().detach()
def learn(self):
# softly update the target networks
for x in self.actor_target.state_dict().keys():
eval('self.actor_target.' + x + '.data.mul_((1-TAU))')
eval('self.actor_target.' + x + '.data.add_(TAU*self.actor_eval.' + x + '.data)')
for x in self.critic_target.state_dict().keys():
eval('self.critic_target.' + x + '.data.mul_((1-TAU))')
eval('self.critic_target.' + x + '.data.add_(TAU*self.critic_eval.' + x + '.data)')
# sample from buffer a mini-batch data
indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
batch_trans = self.memory[indices, :]
# extract data from mini-batch of transitions including s, a, r, s_
batch_s = torch.FloatTensor(batch_trans[:, :self.s_dim]).to(device)
batch_a = torch.FloatTensor(batch_trans[:, self.s_dim:self.s_dim + self.a_dim]).to(device)
batch_r = torch.FloatTensor(batch_trans[:, -self.s_dim - 1: -self.s_dim]).to(device)
batch_s_ = torch.FloatTensor(batch_trans[:, -self.s_dim:]).to(device)
# make action and evaluate its action values
a = self.actor_eval(batch_s)
q = self.critic_eval(batch_s, a)
actor_loss = -torch.mean(q)
self.writer.add_scalar('Loss/actor_loss', actor_loss, global_step=self.num_actor_update_iteration)
# optimize the loss of actor network
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# compute the target Q value using the information of next state
a_target = self.actor_target(batch_s_)
q_tmp = self.critic_target(batch_s_, a_target)
q_target = batch_r + GAMMA * q_tmp
# compute the current q value and the loss
q_eval = self.critic_eval(batch_s, batch_a)
td_error = self.loss_func(q_target, q_eval)
self.writer.add_scalar('Loss/critic_loss', td_error, global_step=self.num_critic_update_iteration)
# optimize the loss of critic network
self.critic_optimizer.zero_grad()
td_error.backward()
self.critic_optimizer.step()
self.num_actor_update_iteration += 1
self.num_critic_update_iteration += 1
critic_loss = td_error
return actor_loss, critic_loss
def save(self):
torch.save(self.actor_eval.state_dict(), directory + 'actor_eval.pth')
torch.save(self.critic_eval.state_dict(), directory + 'critic_eval.pth')
def load(self):
self.actor_eval.load_state_dict(torch.load(directory + 'actor_eval.pth'))
self.critic_eval.load_state_dict(torch.load(directory + 'critic_eval.pth'))
print("====================================")
print("model has been loaded...")
print("====================================")
# **************************************************************************
def data_save(data_name, data):
judge_bool = os.path.exists('data for draw figure')
dir_now = './data for draw figure/'
if judge_bool:
np.save(dir_now + data_name, data)
else:
dir_root = os.getcwd()
os.mkdir(dir_root + './data for draw figure')
np.save(dir_now + data_name, data)
# *************************************************************************
# 用于设计奖励reward
def smooth_l1_reward(x1, x2):
x = x1 - x2
if abs(x) > 0.1:
out = abs(x)
else:
out = 10 * x ** 2
return out
# ############################## Training ######################################
# Define the env
def heading_angle(u, state):
# 离散化模型,采样时间Ts=0.05s,离散算法tustin
# heading angle
b0 = 0.0001124
b1 = 0.0002249
b2 = 0.0001124
a1 = 1.912
a2 = -0.9122
y_1 = math.asin(state[0])
y_2 = math.asin(state[2])
y_out = b0 * u[0] + b1 * u[1] + b2 * u[2] + a1 * y_1 + a2 * y_2
# heading angle speed
db0 = 0.004498
db1 = 0.004498
da1 = 0.9122
dy_1 = state[4]
dy_out = db0 * u[0]
- 1
- 2
前往页