import torch
import torch.nn as nn
import math
import torch.nn.init as init
import torch.functional as F
# class LayerNorm(nn.Module):
# def __init__(self, hidden_size, eps=1e-12):
# """Construct a layernorm module in the TF style (epsilon inside the square root).
# """
# super(LayerNorm, self).__init__()
# self.weight = nn.Parameter(torch.ones(hidden_size))
# self.bias = nn.Parameter(torch.zeros(hidden_size))
# self.variance_epsilon = eps
#
# def forward(self, x):
# u = x.mean(-1, keepdim=True)
# s = (x - u).pow(2).mean(-1, keepdim=True)
# x = (x - u) / torch.sqrt(s + self.variance_epsilon)
# return self.weight * x + self.bias
#
#
# class SelfAttention(nn.Module):
# def __init__(self, num_attention_heads, input_size, hidden_size, hidden_dropout_prob):
# super(SelfAttention, self).__init__()
# if hidden_size % num_attention_heads != 0:
# raise ValueError(
# "The hidden size (%d) is not a multiple of the number of attention "
# "heads (%d)" % (hidden_size, num_attention_heads))
# self.num_attention_heads = num_attention_heads
# self.attention_head_size = int(hidden_size / num_attention_heads)
# self.all_head_size = hidden_size
#
# self.query = nn.Linear(input_size, self.all_head_size)
# self.key = nn.Linear(input_size, self.all_head_size)
# self.value = nn.Linear(input_size, self.all_head_size)
#
# self.attn_dropout = nn.Dropout(0.1)
#
# # 做完self-attention 做一个前馈全连接 LayerNorm 输出
# self.dense = nn.Linear(hidden_size, hidden_size)
# self.LayerNorm = LayerNorm(hidden_size, eps=1e-12)
# self.out_dropout = nn.Dropout(hidden_dropout_prob)
#
# def transpose_for_scores(self, x):
# new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
# x = x.view(*new_x_shape)
# return x.permute(0, 2, 1, 3)
#
# def forward(self, input_tensor):
# mixed_query_layer = self.query(input_tensor)
# mixed_key_layer = self.key(input_tensor)
# mixed_value_layer = self.value(input_tensor)
#
# query_layer = self.transpose_for_scores(mixed_query_layer)
# key_layer = self.transpose_for_scores(mixed_key_layer)
# value_layer = self.transpose_for_scores(mixed_value_layer)
#
# # Take the dot product between "query" and "key" to get the raw attention scores.
# attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
#
# attention_scores = attention_scores / math.sqrt(self.attention_head_size)
# # Apply the attention mask is (precomputed for all layers in BertModel forward() function)
# # [batch_size heads seq_len seq_len] scores
# # [batch_size 1 1 seq_len]
#
# # attention_scores = attention_scores + attention_mask
#
# # Normalize the attention scores to probabilities.
# attention_probs = nn.Softmax(dim=-1)(attention_scores)
# # This is actually dropping out entire tokens to attend to, which might
# # seem a bit unusual, but is taken from the original Transformer paper.
# # Fixme
# attention_probs = self.attn_dropout(attention_probs)
# context_layer = torch.matmul(attention_probs, value_layer)
# context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
# new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
# context_layer = context_layer.view(*new_context_layer_shape)
# hidden_states = self.dense(context_layer)
# hidden_states = self.out_dropout(hidden_states)
# hidden_states = self.LayerNorm(hidden_states + input_tensor)
#
# return hidden_states
class GraphConv(nn.Module):
def __init__(self, c_in, c_out, gso, bias):
super(GraphConv, self).__init__()#调用父类module
self.c_in = c_in
self.c_out = c_out
self.gso = gso
self.weight = nn.Parameter(torch.FloatTensor(c_in, c_out))
if bias:
self.bias = nn.Parameter(torch.FloatTensor(c_out))
else:
self.register_parameter('bias', None)
self.reset_parameters()
def reset_parameters(self):
init.kaiming_uniform_(self.weight, a=math.sqrt(5))
if self.bias is not None:
fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight)
bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
init.uniform_(self.bias, -bound, bound)
def forward(self, x):
# bs, c_in, ts, n_vertex = x.shape
x = torch.permute(x, (0, 3, 2, 1))
first_mul = torch.einsum('hi,btij->bthj', self.gso, x)
second_mul = torch.einsum('bthi,ij->bthj', first_mul, self.weight)
if self.bias is not None:
graph_conv = torch.add(second_mul, self.bias)
else:
graph_conv = second_mul
graph_conv = torch.permute(graph_conv, (0, 3, 2, 1))
return graph_conv
class ChebGraphConv(nn.Module):
def __init__(self, c_in, c_out, Ks, gso, bias):
super(ChebGraphConv, self).__init__()
self.c_in = c_in
self.c_out = c_out
self.Ks = Ks
self.gso = gso
self.weight = nn.Parameter(torch.FloatTensor(Ks, c_in, c_out))
if bias:
self.bias = nn.Parameter(torch.FloatTensor(c_out))
else:
self.register_parameter('bias', None)
self.reset_parameters()
def reset_parameters(self):
init.kaiming_uniform_(self.weight, a=math.sqrt(5))
if self.bias is not None:
fan_in, _ = init._calculate_fan_in_and_fan_out(self.weight)
bound = 1 / math.sqrt(fan_in) if fan_in > 0 else 0
init.uniform_(self.bias, -bound, bound)
def forward(self, x):
#bs, c_in, n, ts = x.shape
#print(self.gso)
x = torch.permute(x, (0, 3, 2, 1))
if self.Ks - 1 < 0:
raise ValueError(
f'ERROR: the graph convolution kernel size Ks has to be a positive integer, but received {self.Ks}.')
elif self.Ks - 1 == 0:
x_0 = x
x_list = [x_0]
elif self.Ks - 1 == 1:
x_0 = x
x_1 = torch.einsum('hi,btij->bthj', self.gso, x)
x_list = [x_0, x_1]
elif self.Ks - 1 >= 2:
x_0 = x
x_1 = torch.einsum('hi,btij->bthj', self.gso, x)
x_list = [x_0, x_1]
for k in range(2, self.Ks):
x_list.append(torch.einsum('hi,btij->bthj', 2 * self.gso, x_list[k - 1]) - x_list[k - 2])
x = torch.stack(x_list, dim=2)
cheb_graph_conv = torch.einsum('btkhi,kij->bthj', x, self.weight)
if self.bias is not None:
cheb_graph_conv = torch.add(cheb_graph_conv, self.bias)
else:
cheb_graph_conv = cheb_graph_conv#(b,12,207,64)
cheb_graph_conv = torch.permute(cheb_graph_conv, (0, 3, 2, 1))
return cheb_graph_conv
class LinearProjection(nn.Module):
def __init__(self,c_in,node):
super(LinearProjection,self).__init__()
self.conv = nn.Conv2d(c_in,2*c_in,(1,node)) #hidden=64
self.relu = nn.LeakyReLU()
self.bn = nn.BatchNorm2d(2*c_in)
self.tanh = nn.Tanh()
def forward(self,x):
# bs, c_in, n, ts
x = x.permute(0,1,3,2) #[batch,64,12,207] ttnet3 (0,2,3,1)
x = self.conv(x) #[batch,128,12,1]
x = self.tanh(x)
#x = self.bn(x) #bs,2*c_in,ts,1
x = x.permute(0,2,1,3)
x = x.reshape(x.size(0),x.size(1),x.size(2)*x.size(3)) #bs,ts,2*c_in
return x
class LearnedPositionalEncoding(nn.Embedding):
d
基于时空图transformer框架的交通流预测
版权申诉
163 浏览量
2023-06-24
10:05:39
上传
评论
收藏 36KB RAR 举报
计算机毕设论文
- 粉丝: 1w+
- 资源: 399
最新资源
- levigo-jbig2-imageio-6.13
- 基于ArkTS的校园通app.zip
- 汉诺塔python实现.zip
- 【Springboot开发】资源springboot-plus-v2.7.18.zip
- 编译原理 C语言编译器(包括词法/语法/语义分析器等)
- 电子设计项目毕业设计及产品设计资料开关电源论文资料
- 模型预测控制(MPC)基准测试问题的开放集合Open collection of model predictive contro
- 基于用户的协同过滤算法实现的商品推荐系统
- TI杯大奖赛本科组+研究生组论文+源代码+PCB自学习式走迷宫智能小车
- TI杯大奖赛本科组+研究生组论文+源代码+PCB专业组-仪器仪表类-兰州大学-基于MSP430的心电监控系统
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈