import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms,datasets
np.random.seed(42)
# ”生成42的固定随机数“
torch.manual_seed(42)
# 设置随机种子后每次torch.rand(1)生成的其他随机数都是一个数,但重新运行不是
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.0,), (1.0,))])
# 图像预处理包,一般用Compose把多个步骤整合到一起,
# ToTensor将numpy的ndarray或PIL.Image读的图片(W,H,C)转换成形状为(C,H, W)的Tensor格式,且所有数除以255归一化到[0,1.0]之间,
# Normalize()把(0,1)变换到(-1,1),经过这样处理后的数据符合标准正态分布,即图像的预处理
dataset = datasets.MNIST(root = './data', train=True, transform = transform, download=True)
# datasets.MNIST是Pytorch的内置函数torchvision.datasets.MNIST,通过这个可以导入数据集
# train=True 代表我们读入的数据作为训练集
# transform则是读入我们自己定义的数据预处理操作
# download=True则是当我们的根目录(root)下没有数据集时,便自动下载。
train_set, val_set = torch.utils.data.random_split(dataset, [50000, 10000])
# ’修改5000-1000‘,数据集函数,随机将一个数据集分割成给定长度的不重叠的新数据集。可选择固定生成器以获得可复现的结果
# dataset (Dataset) – 要划分的数据集。
# lengths (sequence) – 要划分的长度。
# generator (Generator) – 用于随机排列的生成器。
test_set = datasets.MNIST(root = './data', train=False, transform = transform, download=True)
train_loader = torch.utils.data.DataLoader(train_set,batch_size=1,shuffle=True)
val_loader = torch.utils.data.DataLoader(val_set,batch_size=1,shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set,batch_size=1,shuffle=True)
# mnist数据集的加载test_set
# 构建三个可迭代的数据装载器,每一次for循环从,就是从DataLoader中获取一个batch_size大小的数据。
print("Training data:",len(train_loader),"Validation data:",len(val_loader),"Test data: ",len(test_loader))
use_cuda=True
device = torch.device("cuda" if (use_cuda and torch.cuda.is_available()) else "cpu")
# 表示将构建的张量或者模型分配到相应的设备上,指定gpu,这里用cpu,可以优化
class Net(nn.Module): #先构造生成网络
def __init__(self):
super(Net, self).__init__() # 第一句话,调用父类的构造函数,继承原有模型
# 1 input image channel, 6 output channels, 5x5 square convolution
# kernel
# 定义了两个卷积层
# 第一层是输入1通道的(说明是单通道,灰色的图片)图片,输出6层卷积层(说明用到了6个卷积核,而每个卷积核是5*5的)。
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout2d(0.25)
self.dropout2 = nn.Dropout2d(0.5)
# Dropout2d 的赋值对象是彩色的点云数据(batch N,通道 C,深度 D,高度 H,宽 W)的一个通道里的每一个数据,
# 即输入为 Input: (N, C, D, H, W) 时,对每一个通道维度 C 按概率赋值为 0。
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
# 一般把网络中具有可学习参数的层(如全连接层、卷积层等)放在构造函数__init__()中
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
# 一般把不具有可学习参数的层(如ReLU、dropout、BatchNormanation层)可放在构造函数中,也可不放在构造函数中
# forward方法是必须要重写的,它是实现模型的功能,实现各个层之间的连接关系的核心
model = Net().to(device)
optimizer = optim.Adam(model.parameters(),lr=0.0001, betas=(0.9, 0.999))
criterion = nn.NLLLoss()
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3)
def fit(model,device,train_loader,val_loader,epochs):
data_loader = {'train':train_loader,'val':val_loader}
print("Fitting the model...")
train_loss,val_loss=[],[]
for epoch in range(epochs):
loss_per_epoch,val_loss_per_epoch=0,0
for phase in ('train','val'):
for i,data in enumerate(data_loader[phase]):
input,label = data[0].to(device),data[1].to(device)
output = model(input)
#calculating loss on the output
loss = criterion(output,label)
if phase == 'train':
optimizer.zero_grad()
#grad calc w.r.t Loss func
loss.backward()
#update weights
optimizer.step()
loss_per_epoch+=loss.item()
else:
val_loss_per_epoch+=loss.item()
scheduler.step(val_loss_per_epoch/len(val_loader))
print("Epoch: {} Loss: {} Val_Loss: {}".format(epoch+1,loss_per_epoch/len(train_loader),val_loss_per_epoch/len(val_loader)))
train_loss.append(loss_per_epoch/len(train_loader))
val_loss.append(val_loss_per_epoch/len(val_loader))
return train_loss,val_loss
loss,val_loss=fit(model,device,train_loader,val_loader,10)
fig = plt.figure(figsize=(5,5))
plt.plot(np.arange(1,11), loss, "*-",label="Loss")
plt.plot(np.arange(1,11), val_loss,"o-",label="Val Loss")
plt.xlabel("Num of epochs")
plt.legend()
plt.show()
def fgsm_attack(input,epsilon,data_grad):
pert_out = input + epsilon*data_grad.sign()
pert_out = torch.clamp(pert_out, 0, 1)
return pert_out
def ifgsm_attack(input,epsilon,data_grad):
iter = 10
alpha = epsilon/iter
pert_out = input
for i in range(iter-1):
pert_out = pert_out + alpha*data_grad.sign()
pert_out = torch.clamp(pert_out, 0, 1)
if torch.norm((pert_out-input),p=float('inf')) > epsilon:
break
return pert_out
def mifgsm_attack(input,epsilon,data_grad):
iter=10
decay_factor=1.0
pert_out = input
alpha = epsilon/iter
g=0
for i in range(iter-1):
g = decay_factor*g + data_grad/torch.norm(data_grad,p=1)
pert_out = pert_out + alpha*torch.sign(g)
pert_out = torch.clamp(pert_out, 0, 1)
if torch.norm((pert_out-input),p=float('inf')) > epsilon:
break
return pert_out
def test(model, device, test_loader, epsilon, attack):
correct = 0
adv_examples = []
for data, target in test_loader:
data, target = data.to(device), target.to(device)
data.requires_grad = True
output = model(data)
init_pred = output.max(1, keepdim=True)[1]
if init_pred.item() != target.item():
continue
loss = F.nll_loss(output, target)
model.zero_grad()
loss.backward()
data_grad = data.grad.data
if attack == "fgsm":
perturbed_data = fgsm_attack(data, epsilon, data_grad)
elif attack == "ifgsm":
perturbed_data = ifgsm_attack(data, epsilon, data_grad)
elif attack == "mifgsm":
perturbed_data = mifgsm_attack(data, epsilon, data_grad)
output = model(perturbed_data)
final_pred = output.max(1, keepdim=True)[1]
if final_pred.item() == target.item():
correct += 1
if (epsilon == 0) and (len(adv_examples) < 5):
adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
adv_examples.append((init_pred.item(), final_pred.item(), adv_ex))
else:
if len(adv_examples) < 5:
adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
adv_
评论0