#!/usr/bin/env python
# coding=utf-8
# Author : Xionghui Chen
# Created : 2017.1.22
# Modified : 2017.2.3
# Version : 1.0
# Route.py
"""
路线类,包含了一个公路的诸多段落
"""
import copy
import numpy as np
import random
import json
import logging
from matplotlib import pyplot as plt
from functions import binomial_creator, do_probability_test
from Global import MAX_PATH, CARS_INFO, TIME_SLICE, CELL_RATIO, MAX_VELOCITY
from Car import NoAutoCar
class Route(object):
def __init__(self,route_id,resource_item_list):
"""
构造道路路段
"""
self.route_id = route_id# 该路线的id
self.route_amount = len(resource_item_list)# 改路线拥有的道路的个数
self.path_list =[]# 构造每一个道路路段的数据结构
for value in resource_item_list:
self.path_list.append(Path(value))
def plot_for_multi_path(self,count_max):
offset = 0
line_number = offset
while(line_number < MAX_PATH):
# plt.subplot(3,1,line_number - offset + 1)
plt.subplot(MAX_PATH,1,1+line_number)
x_in = []
y_in = []
x_out = []
y_out = []
last_cell_amount = 0
for path in self.path_list:
for times, place_list in enumerate(path.turn_in_from_recorder[line_number]):
for place in place_list:
x_in.append(place + last_cell_amount)
y_in.append(times)
for times, place_list in enumerate(path.turn_out_to_recorder[line_number]):
for place in place_list:
x_out.append(place + last_cell_amount)
y_out.append(times)
last_cell_amount = last_cell_amount + path.cell_amount
plt.plot(x_in,y_in,'or')
plt.plot(x_out,y_out,'ob')
count =2
# 遍历每一个车辆编号,对每个车辆画一条线
while(count <= count_max):
x = []
y = []
last_cell_amount = 0
appear = False
disappear = False
# 遍历每一个里程
for path_num, path in enumerate(self.path_list):
# 遍历一个里程的一个车道的每个时间点记录
for times, path_list in enumerate(path.recorder[line_number]):
# if count == 3 and line_number == 1 and path_num == 1:
# logging.info("[route.plot] self.recorder[line_number]:%s"%path.recorder[line_number])
try:
# 记录这个时间点这个车子所在的位置
place = path_list.index(count)
appear = True
x.append(place+last_cell_amount)
y.append(times)
except Exception as e:
pass
# if appear:
# # 车子曾经出现过,现在车消失了
# # disappear = True
# # plt.plot(x,y,'k-')
# # x = []
# # y = []
# break
last_cell_amount = last_cell_amount + path.cell_amount
# print "last cell amount : %s"%last_cell_amount
plt.plot(x,y,'k-')
count = count + 1
plt.rcParams['font.sans-serif']=['SimHei']
plt.rcParams['axes.unicode_minus']=False
plt.title(u"道路编号 %s 的时空分布图"%line_number,fontsize=15)
plt.xlabel(u"道路单元标号 ")
plt.ylabel(u"时间单元", fontsize=15)
plt.axis([0,last_cell_amount,0,len(path.recorder[line_number])])
line_number = line_number + 1
plt.show()
def plot(self,count_max):
offset = MAX_PATH - 1
line_number = offset
while(line_number < MAX_PATH):
# plt.subplot(3,1,line_number - offset + 1)
# plt.subplot(5,1,1+line_number)
count =2
# 遍历每一个车辆编号,对每个车辆画一条线
while(count <= count_max):
x = []
y = []
last_cell_amount = 0
appear = False
disappear = False
# 遍历每一个里程
for path_num, path in enumerate(self.path_list):
# 遍历一个里程的一个车道的每个时间点记录
for times, path_list in enumerate(path.recorder[line_number]):
# if count == 3 and line_number == 1 and path_num == 1:
# logging.info("[route.plot] self.recorder[line_number]:%s"%path.recorder[line_number])
try:
# 记录这个时间点这个车子所在的位置
place = path_list.index(count)
appear = True
x.append(place+last_cell_amount)
y.append(times)
except Exception as e:
pass
# if appear:
# # 车子曾经出现过,现在车消失了
# # disappear = True
# # plt.plot(x,y,'k-')
# # x = []
# # y = []
# break
last_cell_amount = last_cell_amount + path.cell_amount
# print "last cell amount : %s"%last_cell_amount
plt.plot(x,y,'k-')
count = count + 1
plt.title("time-space in path %s"%line_number,fontsize=15)
plt.xlabel("space")
plt.ylabel("time", fontsize=15)
plt.axis([0,last_cell_amount,0,len(path.recorder[line_number])])
line_number = line_number + 1
plt.show()
# if direction =='up':
#
# line_number = 0
# while line_number < MAX_PATH:
# print "line number is %s"%line_number
# pltb.subplot(5,1,1+line_numer)
# count =2
# while(count <= count_max):
# x = []
# y = []
# last_cell_amount = 0
# for path_num, road in enumerate(self.path_list):
# path = road.inc_path
# for index, path_list in enumerate(path.recorder[4]):
# # if count == 3:
# # logging.info("[route.plot] self.recorder[line_number]:%s"%self.recorder[line_number])
# appear = False
# try:
# place = path_list.index(count)
# appear = True
# x.append(place+last_cell_amount)
# y.append(index)
# except Exception as e:
# if appear:
# # 车消失了
# break
# last_cell_amount = last_cell_amount + path.cell_amount
# # print "last cell amount : %s"%last_cell_amount
# plt.plot(x,y,'k-')
# count = count + 1
# line_number = line_number + 1
# #plt.title("time-space in single path",fontsize=15)
# #plt.xlabel("space")
# #plt.ylabel("time", fontsize=15)
# plt.show()
"""
公路段类,包含了两个方向的公路,对应excel的一调数据
"""
# class Road(object):
# def __init__(self,):
# """
# 构造一个路段
# """
# self.startpost = resource_item['startpost']# A marker on the road that measures distance in miles from either the start of the route or a state boundary.
# self.endpost = resource_item['endpost']# The average number of cars per day driving on the road.
# self.traffic_amount = resource_item['density']# average daily traffic
# self.path_number = resource_item['path_number']
# self.peak_hours=2
# self.no_peak_hours = 18
# self.peak_ratio = 0.08
# self.no_peak_ratio = 0.8
# self.time_slice = TIME_SLICE
# # self.no_peak_amount_per_hours = self._no_peak_amount_per_hours()
# # 累加的二项分布概率
# self.acc_bi_peak = []
# # self.inc_path=Path(self.increase_dir, self.startpost, self.endpost)
# # self.dec_path=Path(self.decrease_dir, self.startpost, self.endpost)
# def get_Singleton_peak(self,last_amount,direction='up'):
# if self.acc_bi_peak== []:
# self.peak_amount_per_hours = self._peak_amount_per_hours(last_amount)
# probability_of_single_car = (self.peak_amount_per_hours/3600*self.time_slice)/(3600*self.time_slice)
# if direction == 'up':
# probability_of_single_car = probability_of_single_car * self.increase_dir
# amount_per_hours = self.peak_amount_per_hours* self.increase_dir
# else:
# probability_of_single_car = probability_of_single_car * self.decrease_dir
# amount_per_hours = self.peak_amount_per_hours* self.decrease_dir
# self.acc_bi_peak = binomial_creator(amount_per_hours,probability_of_single_car)
# return self.acc_bi_peak
# def get_Singleton_no_peak(self,last_amount,direction='up'):
# if self.acc_bi_no_peak== []:
# self.peak_amount_per_hours = self._peak_amount_per_hours()
# probability_of_single_car = (self.peak_amount_per_hours/3600*self.time_slice)/(3600*self.time_slice)
# if direction == 'up':
# probability_of_single_car = probability_of_single_car * self.increase_dir
# amount_per_hours = self.peak_amount_per_hours* self.increase_dir
#