#coding:utf-8
import rospy
import numpy as np
import math
from math import pi
from geometry_msgs.msg import Twist, Point, Pose
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
from tf.transformations import euler_from_quaternion, quaternion_from_euler
from geometry_msgs.msg import PoseStamped
import copy
target_not_movable = False
class Env():
def __init__(self, action_dim=2):
self.goal_x = 0
self.goal_y = 0
self.heading = 0
self.initGoal = True
self.get_goalbox = False
self.position = Pose()
self.pub_cmd_vel = rospy.Publisher('cmd_vel', Twist, queue_size=5)
self.sub_odom = rospy.Subscriber('odom', Odometry, self.getOdometry)
self.past_distance = 0.
self.stopped = 0
self.action_dim = action_dim
#Keys CTRL + c will stop script
rospy.on_shutdown(self.shutdown)
#停止运动
#等待目标点
def wait_for_goal(self):
raw_goal_data = None
print('waiting goal...')
while raw_goal_data is None:
try:
raw_goal_data = rospy.wait_for_message('move_base_simple/goal',PoseStamped, timeout=100)
except:
pass
self.goal_x = raw_goal_data.pose.position.x
self.goal_y = raw_goal_data.pose.position.y
print('goal_position:',self.goal_x,self.goal_y)
return self.goal_x, self.goal_y
def shutdown(self):
rospy.loginfo("Stopping TurtleBot")
self.pub_cmd_vel.publish(Twist())
rospy.sleep(1)
#获取目标距离
def getGoalDistace(self):
goal_distance = round(math.hypot(self.goal_x - self.position.x, self.goal_y - self.position.y), 2)
self.past_distance = goal_distance
print('goal_distance:',goal_distance)
return goal_distance
#获得里程计数据
def getOdometry(self, odom):
self.past_position = copy.deepcopy(self.position)
self.position = odom.pose.pose.position
orientation = odom.pose.pose.orientation
orientation_list = [orientation.x, orientation.y, orientation.z, orientation.w]
_, _, yaw = euler_from_quaternion(orientation_list)
goal_angle = math.atan2(self.goal_y - self.position.y, self.goal_x - self.position.x)
heading = goal_angle - yaw
if heading > pi:
heading -= 2 * pi
elif heading < -pi:
heading += 2 * pi
self.heading = round(heading, 3)
#获取激光雷达数据
def getState(self, scan, past_action):
scan_range = []
heading = self.heading
min_range = 0.1
done = False
print(len(scan.ranges))
if 528>=len(scan.ranges)>=510:
t =22
elif len(scan.ranges)>=580:
t=25
# elif len(scan.ranges)==580:
# t=24
else:
t =23
for i in range(0, len(scan.ranges), t):
if scan.ranges[i] == float('Inf')or scan.ranges[i] ==0 :#or scan.ranges[i] == 0:
scan_range.append(3.5)
elif np.isnan(scan.ranges[i]):
scan_range.append(0)
else:
scan_range.append(scan.ranges[i])
if min_range > min(scan_range) > 0:
done = True
for pa in past_action:
scan_range.append(pa)
current_distance = round(math.hypot(self.goal_x - self.position.x, self.goal_y - self.position.y),2)
if current_distance < 0.15:
self.get_goalbox = True
return scan_range + [heading, current_distance], done
#设置奖励
def setReward(self, state, done):
current_distance = state[-1]
distance_rate = (self.past_distance - current_distance)
if distance_rate > 0:
reward = 2.
if distance_rate <= 0:
reward = -2.
self.past_distance = current_distance
a, b, c, d = float('{0:.3f}'.format(self.position.x)), float('{0:.3f}'.format(self.past_position.x)), float('{0:.3f}'.format(self.position.y)), float('{0:.3f}'.format(self.past_position.y))
if a == b and c == d:
self.stopped += 1
if self.stopped == 20:
rospy.loginfo('Robot is in the same 20 times in a row')
self.stopped = 0
done = True
else:
self.stopped = 0
if done:
rospy.loginfo("Collision!!")
reward = -10.
self.pub_cmd_vel.publish(Twist())
if self.get_goalbox:
rospy.loginfo("Goal!!")
reward = 100.
self.pub_cmd_vel.publish(Twist())
self.goal_x, self.goal_y = self.wait_for_goal()
self.goal_distance = self.getGoalDistace()
self.get_goalbox = False
return reward, done
def step(self, action, past_action):
linear_vel = action[0,0]
ang_vel = action[0,1]
vel_cmd = Twist()
vel_cmd.linear.x = linear_vel
vel_cmd.angular.z = ang_vel
self.pub_cmd_vel.publish(vel_cmd)
data = None
while data is None:
try:
print('wait_for_laser_service')
data = rospy.wait_for_message('scan', LaserScan, timeout=5)
except:
pass
state, done = self.getState(data, past_action)
reward, done = self.setReward(state, done)
return np.asarray(state), reward, done
def reset(self):
data = None
while data is None:
try:
print('wait_for_laser_service')
data = rospy.wait_for_message('scan', LaserScan, timeout=5)
except:
pass
if self.initGoal:
self.goal_x, self.goal_y = self.wait_for_goal()
self.initGoal = False
self.goal_distance = self.getGoalDistace()
state, done = self.getState(data,[0]*self.action_dim)
return np.asarray(state)