目录
一、数据
二、模型构建
三、模型训练及评价
四、打印参数量和计算量
五、模型预测
附:完整可运行代码
实验大致步骤:
一、数据
下载网站:MNIST数据集
之前的官网不能下载数据集了,403了,所以找到一个类似的网站去下载
下载后进行读取、预处理和分隔成训练集、验证集、测试集:
# 读取 MNIST 图像数据
def load_images(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big') # 读取 magic numbernum_images = int.from_bytes(f.read(4), 'big') # 图片数量num_rows = int.from_bytes(f.read(4), 'big') # 图片行数num_cols = int.from_bytes(f.read(4), 'big') # 图片列数images = np.frombuffer(f.read(), dtype=np.uint8) # 读取图片数据images = images.reshape(num_images, num_rows, num_cols) # 重塑为 (num_images, 28, 28)return images# 读取 MNIST 标签数据
def load_labels(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big') # 读取 magic numbernum_labels = int.from_bytes(f.read(4), 'big') # 标签数量labels = np.frombuffer(f.read(), dtype=np.uint8) # 读取标签数据return labels# 加载数据
images = load_images('./MNIST/raw/train-images-idx3-ubyte.gz')
labels = load_labels('./MNIST/raw/train-labels-idx1-ubyte.gz')
train_images = images[:1000]
train_labels = labels[:1000]
dev_images = images[1000:1200]
dev_labels = labels[1000:1200]
test_images = images[1200:1400]
test_labels = labels[1200:1400]# 打印数据集分布信息
print(f'Length of train/dev/test set: {len(train_images)}/{len(dev_images)}/{len(test_images)}')image, label = train_images[2], train_labels[2]
image, label = np.array(image).astype('float32'), int(label)
# 原始图像数据为长度784的行向量,需要调整为[28,28]大小的图像
image = np.reshape(image, [28, 28])
image = Image.fromarray(image.astype('uint8'), mode='L')
print("The number in the picture is {}".format(label))
plt.figure(figsize=(5, 5))
plt.imshow(image)
plt.show()# 定义训练集、验证集和测试集
train_set = {"images": train_images, "labels": train_labels}
dev_set = {"images": dev_images, "labels": dev_labels}
test_set = {"images": test_images, "labels": test_labels}# 数据预处理
transforms = transforms.Compose([transforms.Resize(32), transforms.ToTensor(), transforms.Normalize(mean=[0.5], std=[0.5])])class MNIST_dataset(Dataset):def __init__(self, dataset, transforms, mode='train'):self.mode = modeself.transforms = transformsself.dataset = datasetdef __getitem__(self, idx):# 从字典中获取图像和标签image, label = self.dataset["images"][idx], self.dataset["labels"][idx]image, label = np.array(image).astype('float32'), int(label)image = np.reshape(image, [28, 28])image = Image.fromarray(image.astype('uint8'), mode='L')image = self.transforms(image)return image, labeldef __len__(self):# 返回图像数量return len(self.dataset["images"])# 加载 mnist 数据集
train_dataset = MNIST_dataset(dataset=train_set, transforms=transforms, mode='train')
test_dataset = MNIST_dataset(dataset=test_set, transforms=transforms, mode='test')
dev_dataset = MNIST_dataset(dataset=dev_set, transforms=transforms, mode='dev')
运行结果:
成功划分数据集
加载第一张图片并显示
二、模型构建
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.nn.init import constant_, normal_, uniform_
from CNN_op import Conv2D, Pool2Dclass Model_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(Model_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5×5self.conv1 = Conv2D(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2×2,步长为2self.pool2 = Pool2D(size=(2, 2), mode='max', stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5×5,步长为1self.conv3 = Conv2D(in_channels=6, out_channels=16, kernel_size=5, stride=1)# 汇聚层:汇聚窗口为2×2,步长为2self.pool4 = Pool2D(size=(2, 2), mode='avg', stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5×5self.conv5 = Conv2D(in_channels=16, out_channels=120, kernel_size=5, stride=1)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(120, 84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(84, num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return outputclass PyTorch_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(PyTorch_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5*5self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5*5self.conv3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool4 = nn.AvgPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5*5self.conv5 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(in_features=120, out_features=84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(in_features=84, out_features=num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return output# 打印每一层的参数
# 这里用np.random创建一个随机数组作为输入数据
inputs = np.random.randn(*[1, 1, 32, 32])
inputs = inputs.astype('float32')
model = PyTorch_LeNet(in_channels=1, num_classes=10)
c = []
for a, b in model.named_children():c.append(a)
print(c)
x = torch.tensor(inputs)
for a, item in model.named_children():try:x = item(x)except:x = torch.reshape(x, [x.shape[0], -1])x = item(x)d = []e = []for b, c in item.named_parameters():d.append(b)e.append(c)if len(e) == 2:print(a, x.shape, e[0].shape,e[1].shape)else:# 汇聚层没有参数print(a, x.shape)
测试结果:
三、模型训练及评价
代码如下:
# 模型训练
import torch.optim as opt
import torch.utils.data as data
from nndl_3 import RunnerV3, Accuracybatch_size = 16
train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = data.DataLoader(dev_dataset, batch_size=batch_size)
test_loader = data.DataLoader(test_dataset, batch_size=batch_size)seed = 300
torch.manual_seed(seed)
torch.cuda.manual_seed(seed) # 如果使用 GPU,还可以设置 CUDA 的随机种子
torch.backends.cudnn.deterministic = True # 使得 CUDA 确定性计算
torch.backends.cudnn.benchmark = False # 防止优化导致不一致# 自己定义的卷积、池化算子进行训练==================================
# print("使用自己定义的算子进行训练:================================")
# time1 = time.time()
# model = Model_LeNet(in_channels=1, num_classes=10)
# # 定义优化器
# lr = 0.1
# optimizer = opt.SGD(lr=lr, params=model.parameters())
# # 定义损失函数
# loss_fn = F.cross_entropy
# # 定义评价指标
# metric = Accuracy(is_logist=True)
# # 实例化 RunnerV3 类,并传入训练配置。
# runner = RunnerV3(model, optimizer, loss_fn, metric)
# # 启动训练
# log_steps = 15
# eval_steps = 15
# runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,
# eval_steps=eval_steps, save_path="best_model.pdparams")
# time2 = time.time()
# print("使用自定义算子运行时间:", time2 - time1)
# # 损失可视化
# from nndl_3 import plot
# plot(runner, 'cnn-loss1.pdf')
#
# # 加载最优模型
# runner.load_model('best_model.pdparams')
# # 模型评价
# score, loss = runner.evaluate(test_loader)
# print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
# print("=================================================================")# 使用torch框架算子进行训练======================================
print("使用框架算子进行训练:=======================================")
time3 = time.time()
model = PyTorch_LeNet(in_channels=1, num_classes=10)
# 定义优化器
lr = 0.1
optimizer = opt.SGD(lr=lr, params=model.parameters())
# 定义损失函数
loss_fn = F.cross_entropy
# 定义评价指标
metric = Accuracy(is_logist=True)
# 实例化 RunnerV3 类,并传入训练配置。
runner = RunnerV3(model, optimizer, loss_fn, metric)
# 启动训练
log_steps = 15
eval_steps = 15
runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,eval_steps=eval_steps, save_path="best_model.pdparams")
time4 = time.time()
print("使用框架算子的运行时间", time4 - time3)
# 损失可视化
from nndl_3 import plot
plot(runner, 'cnn-loss1.pdf')
# 加载最优模型
runner.load_model('best_model.pdparams')
# 模型评价
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
训练时间对比:
利用上节实验课自己手搓的Conv2D算子和Pool2D算子进行训练,实在是太慢了,于是使用Pytorch的算子进行训练,发现两者运行时间竟然差了7分钟,可见一个成熟的框架算子速率有多快。
运行结果:
注:
实验时,出现了一个意外,就是dev上的loss和score不会随着迭代而发生变化。分析好几遍代码也没有找到问题所在。在之前的前馈神经网络中也出现过这种情况。之前以为是批大小和学习率的问题,后面发现并不是。
在对手写的卷积算子和框架算子进行对比的时候,不小心将model对定义了一遍,导致产生了如下结构:
虽然在实例化Runner类的时候传进去的model都是实例化PyTorch_LeNet的,但是在重新实例化model后,优化器的参数没有跟上新的模型实例。因为优化器在第一次实例化模型时已经绑定了那个模型的参数,当后面重新实例化一个新的模型时,优化器依然是使用之前绑定的模型的参数,而不是新的模型的参数,所以优化时优化器不会自动更新绑定的模型参数,因此会导致训练过程中模型的权重无法更新,从而出现dev准确率不变的问题。
注:
在训练过程中发现,不仅仅是lr会影响最后的准确率,我自己设置的seed随机种子也是会影响准确率的。
当我给seed值设置为常见的42时,准确率只有85%;但是我给seed值设置为300时,准确率达到了92%。原来并不是设置了随机种子就能每次训练结果都一样,种子的大小也会对训练结果造成一定的影响。虽然一样的seed值能产生一样的结果,但是不同的 seed 值导致了不同的随机初始化、数据打乱和优化器行为,这些都会影响模型的训练结果。
四、打印参数量和计算量
代码如下:
# =================================================================
# 计算参数量
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # PyTorch v0.4.0
Torch_model = PyTorch_LeNet(in_channels=1, num_classes=10).to(device)
summary(Torch_model, (1, 32, 32))# ==================================================================
# 计算计算量
model = PyTorch_LeNet(in_channels=1, num_classes=10)
dummy_input = torch.randn(1, 1, 32, 32)# 使用 torchprofile 来计算 FLOPs 和参数量
profile = torchprofile.profile_macs(model, dummy_input)
print(f"FLOPs: {profile}")
运行结果:
五、模型预测
使用保存好的模型,对测试集中的某一个数据进行模型预测,观察模型效果
代码如下:
# 模型预测
# 获取测试集中第一条数据
X, label = next(iter(test_loader))
logits = runner.predict(X)
# 多分类,使用softmax计算预测概率
pred = F.softmax(logits,dim=1)
# 获取概率最大的类别
pred_class = torch.argmax(pred[2]).numpy()
label = label[2].numpy()
# 输出真实类别与预测类别
print("The true category is {} and the predicted category is {}".format(label, pred_class))
# 可视化图片
plt.figure(figsize=(2, 2))
image, label = test_images[2], test_labels[2]
image = np.array(image).astype('float32')
image = np.reshape(image, [28,28])
image = Image.fromarray(image.astype('uint8'), mode='L')
plt.imshow(image)
plt.show()
运行结果:
预测正确!
附:完整可运行代码
主程序:
import gzip
import timeimport numpy as np
import torchprofile
from matplotlib import pyplot as plt
import PIL.Image as Image
import torchvision.transforms as transforms
from torch.profiler import profile
from torch.utils.data import Dataset, DataLoader# 读取 MNIST 图像数据
def load_images(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big') # 读取 magic numbernum_images = int.from_bytes(f.read(4), 'big') # 图片数量num_rows = int.from_bytes(f.read(4), 'big') # 图片行数num_cols = int.from_bytes(f.read(4), 'big') # 图片列数images = np.frombuffer(f.read(), dtype=np.uint8) # 读取图片数据images = images.reshape(num_images, num_rows, num_cols) # 重塑为 (num_images, 28, 28)return images# 读取 MNIST 标签数据
def load_labels(file_path):with gzip.open(file_path, 'rb') as f:magic_number = int.from_bytes(f.read(4), 'big') # 读取 magic numbernum_labels = int.from_bytes(f.read(4), 'big') # 标签数量labels = np.frombuffer(f.read(), dtype=np.uint8) # 读取标签数据return labels# 加载数据
images = load_images('./MNIST/raw/train-images-idx3-ubyte.gz')
labels = load_labels('./MNIST/raw/train-labels-idx1-ubyte.gz')
train_images = images[:1000]
train_labels = labels[:1000]
dev_images = images[1000:1200]
dev_labels = labels[1000:1200]
test_images = images[1200:1400]
test_labels = labels[1200:1400]# 打印数据集分布信息
print(f'Length of train/dev/test set: {len(train_images)}/{len(dev_images)}/{len(test_images)}')image, label = train_images[2], train_labels[2]
image, label = np.array(image).astype('float32'), int(label)
# 原始图像数据为长度784的行向量,需要调整为[28,28]大小的图像
image = np.reshape(image, [28, 28])
image = Image.fromarray(image.astype('uint8'), mode='L')
print("The number in the picture is {}".format(label))
plt.figure(figsize=(5, 5))
plt.imshow(image)
plt.show()# 定义训练集、验证集和测试集
train_set = {"images": train_images, "labels": train_labels}
dev_set = {"images": dev_images, "labels": dev_labels}
test_set = {"images": test_images, "labels": test_labels}# 数据预处理
transforms = transforms.Compose([transforms.Resize(32), transforms.ToTensor(), transforms.Normalize(mean=[0.5], std=[0.5])])class MNIST_dataset(Dataset):def __init__(self, dataset, transforms, mode='train'):self.mode = modeself.transforms = transformsself.dataset = datasetdef __getitem__(self, idx):# 从字典中获取图像和标签image, label = self.dataset["images"][idx], self.dataset["labels"][idx]image, label = np.array(image).astype('float32'), int(label)image = np.reshape(image, [28, 28])image = Image.fromarray(image.astype('uint8'), mode='L')image = self.transforms(image)return image, labeldef __len__(self):# 返回图像数量return len(self.dataset["images"])# 加载 mnist 数据集
train_dataset = MNIST_dataset(dataset=train_set, transforms=transforms, mode='train')
test_dataset = MNIST_dataset(dataset=test_set, transforms=transforms, mode='test')
dev_dataset = MNIST_dataset(dataset=dev_set, transforms=transforms, mode='dev')import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.nn.init import constant_, normal_, uniform_
from CNN_op import Conv2D, Pool2Dclass Model_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(Model_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5×5self.conv1 = Conv2D(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2×2,步长为2self.pool2 = Pool2D(size=(2, 2), mode='max', stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5×5,步长为1self.conv3 = Conv2D(in_channels=6, out_channels=16, kernel_size=5, stride=1)# 汇聚层:汇聚窗口为2×2,步长为2self.pool4 = Pool2D(size=(2, 2), mode='avg', stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5×5self.conv5 = Conv2D(in_channels=16, out_channels=120, kernel_size=5, stride=1)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(120, 84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(84, num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return outputclass PyTorch_LeNet(nn.Module):def __init__(self, in_channels, num_classes=10):super(PyTorch_LeNet, self).__init__()# 卷积层:输出通道数为6,卷积核大小为5*5self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=6, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为6,输出通道数为16,卷积核大小为5*5self.conv3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5)# 汇聚层:汇聚窗口为2*2,步长为2self.pool4 = nn.AvgPool2d(kernel_size=2, stride=2)# 卷积层:输入通道数为16,输出通道数为120,卷积核大小为5*5self.conv5 = nn.Conv2d(in_channels=16, out_channels=120, kernel_size=5)# 全连接层:输入神经元为120,输出神经元为84self.linear6 = nn.Linear(in_features=120, out_features=84)# 全连接层:输入神经元为84,输出神经元为类别数self.linear7 = nn.Linear(in_features=84, out_features=num_classes)def forward(self, x):# C1:卷积层+激活函数output = F.relu(self.conv1(x))# S2:汇聚层output = self.pool2(output)# C3:卷积层+激活函数output = F.relu(self.conv3(output))# S4:汇聚层output = self.pool4(output)# C5:卷积层+激活函数output = F.relu(self.conv5(output))# 输入层将数据拉平[B,C,H,W] -> [B,CxHxW]output = torch.squeeze(output, dim=3)output = torch.squeeze(output, dim=2)# F6:全连接层output = F.relu(self.linear6(output))# F7:全连接层output = self.linear7(output)return output# 打印每一层的参数
# 这里用np.random创建一个随机数组作为输入数据
inputs = np.random.randn(*[1, 1, 32, 32])
inputs = inputs.astype('float32')
model = PyTorch_LeNet(in_channels=1, num_classes=10)
c = []
for a, b in model.named_children():c.append(a)
print(c)
x = torch.tensor(inputs)
for a, item in model.named_children():try:x = item(x)except:x = torch.reshape(x, [x.shape[0], -1])x = item(x)d = []e = []for b, c in item.named_parameters():d.append(b)e.append(c)if len(e) == 2:print(a, x.shape, e[0].shape,e[1].shape)else:# 汇聚层没有参数print(a, x.shape)# ==========================================================================
# 模型训练
import torch.optim as opt
import torch.utils.data as data
from nndl_3 import RunnerV3, Accuracybatch_size = 16
train_loader = data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_loader = data.DataLoader(dev_dataset, batch_size=batch_size)
test_loader = data.DataLoader(test_dataset, batch_size=batch_size)seed = 300
torch.manual_seed(seed)
torch.cuda.manual_seed(seed) # 如果使用 GPU,还可以设置 CUDA 的随机种子
torch.backends.cudnn.deterministic = True # 使得 CUDA 确定性计算
torch.backends.cudnn.benchmark = False # 防止优化导致不一致# 自己定义的卷积、池化算子进行训练==================================
# print("使用自己定义的算子进行训练:================================")
# time1 = time.time()
# model = Model_LeNet(in_channels=1, num_classes=10)
# # 定义优化器
# lr = 0.1
# optimizer = opt.SGD(lr=lr, params=model.parameters())
# # 定义损失函数
# loss_fn = F.cross_entropy
# # 定义评价指标
# metric = Accuracy(is_logist=True)
# # 实例化 RunnerV3 类,并传入训练配置。
# runner = RunnerV3(model, optimizer, loss_fn, metric)
# # 启动训练
# log_steps = 15
# eval_steps = 15
# runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,
# eval_steps=eval_steps, save_path="best_model.pdparams")
# time2 = time.time()
# print("使用自定义算子运行时间:", time2 - time1)
# # 损失可视化
# from nndl_3 import plot
# plot(runner, 'cnn-loss1.pdf')
#
# # 加载最优模型
# runner.load_model('best_model.pdparams')
# # 模型评价
# score, loss = runner.evaluate(test_loader)
# print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))
# print("=================================================================")# 使用torch框架算子进行训练======================================
print("使用框架算子进行训练:=======================================")
time3 = time.time()
model = PyTorch_LeNet(in_channels=1, num_classes=10)
# 定义优化器
lr = 0.1
optimizer = opt.SGD(lr=lr, params=model.parameters())
# 定义损失函数
loss_fn = F.cross_entropy
# 定义评价指标
metric = Accuracy(is_logist=True)
# 实例化 RunnerV3 类,并传入训练配置。
runner = RunnerV3(model, optimizer, loss_fn, metric)
# 启动训练
log_steps = 15
eval_steps = 15
runner.train(train_loader, dev_loader, num_epochs=10, log_steps=log_steps,eval_steps=eval_steps, save_path="best_model.pdparams")
time4 = time.time()
print("使用框架算子的运行时间", time4 - time3)
# 损失可视化
from nndl_3 import plot
plot(runner, 'cnn-loss1.pdf')
# 加载最优模型
runner.load_model('best_model.pdparams')
# 模型评价
score, loss = runner.evaluate(test_loader)
print("[Test] accuracy/loss: {:.4f}/{:.4f}".format(score, loss))# =================================================================
# 计算参数量
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # PyTorch v0.4.0
Torch_model = PyTorch_LeNet(in_channels=1, num_classes=10).to(device)
summary(Torch_model, (1, 32, 32))# ==================================================================
# 计算计算量
model = PyTorch_LeNet(in_channels=1, num_classes=10)
dummy_input = torch.randn(1, 1, 32, 32)# 使用 torchprofile 来计算 FLOPs 和参数量
profile = torchprofile.profile_macs(model, dummy_input)
print(f"FLOPs: {profile}")# =======================================================
# 模型预测
# 获取测试集中第一条数据
X, label = next(iter(test_loader))
logits = runner.predict(X)
# 多分类,使用softmax计算预测概率
pred = F.softmax(logits,dim=1)
# 获取概率最大的类别
pred_class = torch.argmax(pred[2]).numpy()
label = label[2].numpy()
# 输出真实类别与预测类别
print("The true category is {} and the predicted category is {}".format(label, pred_class))
# 可视化图片
plt.figure(figsize=(2, 2))
image, label = test_images[2], test_labels[2]
image = np.array(image).astype('float32')
image = np.reshape(image, [28,28])
image = Image.fromarray(image.astype('uint8'), mode='L')
plt.imshow(image)
plt.show()
nndl_3.py代码:
import torch
from matplotlib import pyplot as plt
from torch import nnclass Op(object):def __init__(self):passdef __call__(self, inputs):return self.forward(inputs)def forward(self, inputs):raise NotImplementedErrordef backward(self, inputs):raise NotImplementedError# 实现一个两层前馈神经网络
class Model_MLP_L2_V3(torch.nn.Module):def __init__(self, input_size, hidden_size, output_size):super(Model_MLP_L2_V3, self).__init__()self.fc1 = torch.nn.Linear(input_size, hidden_size)w_ = torch.normal(0, 0.01, size=(hidden_size, input_size), requires_grad=True)self.fc1.weight = torch.nn.Parameter(w_)self.fc1.bias = torch.nn.init.constant_(self.fc1.bias, val=1.0)self.fc2 = torch.nn.Linear(hidden_size, output_size)w2 = torch.normal(0, 0.01, size=(output_size, hidden_size), requires_grad=True)self.fc2.weight = nn.Parameter(w2)self.fc2.bias = torch.nn.init.constant_(self.fc2.bias, val=1.0)self.act = torch.sigmoiddef forward(self, inputs):outputs = self.fc1(inputs)outputs = self.act(outputs)outputs = self.fc2(outputs)return outputsclass RunnerV3(object):def __init__(self, model, optimizer, loss_fn, metric, **kwargs):self.model = modelself.optimizer = optimizerself.loss_fn = loss_fnself.metric = metric # 只用于计算评价指标# 记录训练过程中的评价指标变化情况self.dev_scores = []# 记录训练过程中的损失函数变化情况self.train_epoch_losses = [] # 一个epoch记录一次lossself.train_step_losses = [] # 一个step记录一次lossself.dev_losses = []# 记录全局最优指标self.best_score = 0def train(self, train_loader, dev_loader=None, **kwargs):# 将模型切换为训练模式self.model.train()# 传入训练轮数,如果没有传入值则默认为0num_epochs = kwargs.get("num_epochs", 0)# 传入log打印频率,如果没有传入值则默认为100log_steps = kwargs.get("log_steps", 100)# 评价频率eval_steps = kwargs.get("eval_steps", 0)# 传入模型保存路径,如果没有传入值则默认为"best_model.pdparams"save_path = kwargs.get("save_path", "best_model.pdparams")custom_print_log = kwargs.get("custom_print_log", None)# 训练总的步数num_training_steps = num_epochs * len(train_loader)if eval_steps:if self.metric is None:raise RuntimeError('Error: Metric can not be None!')if dev_loader is None:raise RuntimeError('Error: dev_loader can not be None!')# 运行的step数目global_step = 0# 进行num_epochs轮训练for epoch in range(num_epochs):# 用于统计训练集的损失total_loss = 0for step, data in enumerate(train_loader):X, y = data# 获取模型预测logits = self.model(X)loss = self.loss_fn(logits, y) # 默认求meantotal_loss += loss# 训练过程中,每个step的loss进行保存self.train_step_losses.append((global_step, loss.item()))if log_steps and global_step % log_steps == 0:print(f"[Train] epoch: {epoch}/{num_epochs}, step: {global_step}/{num_training_steps}, loss: {loss.item():.5f}")# 梯度反向传播,计算每个参数的梯度值loss.backward()if custom_print_log:custom_print_log(self)# 小批量梯度下降进行参数更新self.optimizer.step()# 梯度归零self.optimizer.zero_grad()# 判断是否需要评价if eval_steps > 0 and global_step > 0 and \(global_step % eval_steps == 0 or global_step == (num_training_steps - 1)):dev_score, dev_loss = self.evaluate(dev_loader, global_step=global_step)print(f"[Evaluate] dev score: {dev_score:.5f}, dev loss: {dev_loss:.5f}")# 将模型切换为训练模式self.model.train()# 如果当前指标为最优指标,保存该模型if dev_score > self.best_score:self.save_model(save_path)print(f"[Evaluate] best accuracy performence has been updated: {self.best_score:.5f} --> {dev_score:.5f}")self.best_score = dev_scoreglobal_step += 1# 当前epoch 训练loss累计值trn_loss = (total_loss / len(train_loader)).item()# epoch粒度的训练loss保存self.train_epoch_losses.append(trn_loss)print("[Train] Training done!")# 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度@torch.no_grad()def evaluate(self, dev_loader, **kwargs):assert self.metric is not None# 将模型设置为评估模式self.model.eval()global_step = kwargs.get("global_step", -1)# 用于统计训练集的损失total_loss = 0# 重置评价self.metric.reset()# 遍历验证集每个批次for batch_id, data in enumerate(dev_loader):X, y = data# 计算模型输出logits = self.model(X)# 计算损失函数loss = self.loss_fn(logits, y).item()# 累积损失total_loss += loss# 累积评价self.metric.update(logits, y)dev_loss = (total_loss / len(dev_loader))dev_score = self.metric.accumulate()# 记录验证集lossif global_step != -1:self.dev_losses.append((global_step, dev_loss))self.dev_scores.append(dev_score)return dev_score, dev_loss# 模型评估阶段,使用'torch.no_grad()'控制不计算和存储梯度@torch.no_grad()def predict(self, x, **kwargs):# 将模型设置为评估模式self.model.eval()# 运行模型前向计算,得到预测值logits = self.model(x)return logitsdef save_model(self, save_path):torch.save(self.model.state_dict(), save_path)def load_model(self, model_path):model_state_dict = torch.load(model_path)self.model.load_state_dict(model_state_dict)class Accuracy():def __init__(self, is_logist=True):# 用于统计正确的样本个数self.num_correct = 0# 用于统计样本的总数self.num_count = 0self.is_logist = is_logistdef update(self, outputs, labels):if outputs.shape[1] == 1: # 二分类outputs = torch.squeeze(outputs, dim=-1)if self.is_logist:# logist判断是否大于0preds = torch.tensor((outputs >= 0), dtype=torch.float32)else:# 如果不是logist,判断每个概率值是否大于0.5,当大于0.5时,类别为1,否则类别为0preds = torch.tensor((outputs >= 0.5), dtype=torch.float32)else:# 多分类时,使用'torch.argmax'计算最大元素索引作为类别preds = torch.argmax(outputs, dim=1)# 获取本批数据中预测正确的样本个数labels = torch.squeeze(labels, dim=-1)batch_correct = torch.sum(torch.tensor(preds == labels, dtype=torch.float32)).numpy()batch_count = len(labels)# 更新num_correct 和 num_countself.num_correct += batch_correctself.num_count += batch_countdef accumulate(self):# 使用累计的数据,计算总的指标if self.num_count == 0:return 0return self.num_correct / self.num_countdef reset(self):# 重置正确的数目和总数self.num_correct = 0self.num_count = 0def name(self):return "Accuracy"# 可视化
def plot(runner, fig_name):plt.figure(figsize=(10, 5))plt.subplot(1, 2, 1)train_items = runner.train_step_losses[::30]train_steps = [x[0] for x in train_items]train_losses = [x[1] for x in train_items]plt.plot(train_steps, train_losses, color='#8E004D', label="Train loss")if runner.dev_losses[0][0] != -1:dev_steps = [x[0] for x in runner.dev_losses]dev_losses = [x[1] for x in runner.dev_losses]plt.plot(dev_steps, dev_losses, color='#E20079', linestyle='--', label="Dev loss")# 绘制坐标轴和图例plt.ylabel("loss", fontsize='x-large')plt.xlabel("step", fontsize='x-large')plt.legend(loc='upper right', fontsize='x-large')plt.subplot(1, 2, 2)# 绘制评价准确率变化曲线if runner.dev_losses[0][0] != -1:plt.plot(dev_steps, runner.dev_scores,color='#E20079', linestyle="--", label="Dev accuracy")else:plt.plot(list(range(len(runner.dev_scores))), runner.dev_scores,color='#E20079', linestyle="--", label="Dev accuracy")# 绘制坐标轴和图例plt.ylabel("score", fontsize='x-large')plt.xlabel("step", fontsize='x-large')plt.legend(loc='lower right', fontsize='x-large')plt.savefig(fig_name)plt.show()
本次的分享就到这里啦,我们下次再见~