Pytorch - ResNet - pretrain

  • 直接使用torchvision.models提供的resnet18模型

  • Imagenet数据集太大,采用CIFAR-10

下载数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import torch
import torchvision
import torchvision.transforms as transforms

# 定义图像预处理方法
transform = transforms.Compose(
[transforms.Resize((224, 224)), # ResNet需要224x224的输入
transforms.ToTensor(), # 将图像转换为Tensor
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) # 对Tensor归一化

# 加载数据集
# root:下载路径 train:是否为训练集 download:如果不存在则下载 transform:预处理方法
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
# 定义数据加载器
# trainset:数据集 batch_size:批次大小 shuffle:是否打乱 num_workers:加载数据时的子进程数
trainloader = torch.utils.data.DataLoader(trainset, batch_size=32,
shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=32,
shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck')
  • 上面是采用CIFAR10数据集,当然也可以使用torch.utils.data.Subset选取部分imagenet数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset
import numpy as np

# 图像预处理
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

# 加载ImageNet的完整训练集
full_dataset = torchvision.datasets.ImageNet(root='./data', split='train', transform=transform)

# 定义要使用的数据集大小
train_size = 10000 # 从ImageNet中选取10000个样本,作为训练集
test_size = 2000 # 从ImageNet中选取2000个样本,作为测试集

# 创建随机索引
indices = np.random.permutation(len(full_dataset))

# 分割索引为训练和测试
train_indices = indices[:train_size]
test_indices = indices[train_size:train_size+test_size]

# 使用Subset来创建训练集和测试集的子集
train_subset = Subset(full_dataset, train_indices)
test_subset = Subset(full_dataset, test_indices)

# 创建训练集和测试集的数据加载器
train_loader = DataLoader(train_subset, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_subset, batch_size=32, shuffle=False, num_workers=4)

数据进一步划分

  • 上面已经对数据进行下载、转换Tensor、归一化,预处理完成
  • 接下来,可以对部分训练数据划分为验证集
1
2
3
4
5
6
7
8
9
10
from sklearn.model_selection import train_test_split

# 使用train_test_split进行数据划分
train_indices, val_indices = train_test_split(list(range(len(trainset))), test_size=0.2)

train_subset = torch.utils.data.Subset(trainset, train_indices)
val_subset = torch.utils.data.Subset(trainset, val_indices)

trainloader = torch.utils.data.DataLoader(train_subset, batch_size=32, shuffle=True, num_workers=2)
valloader = torch.utils.data.DataLoader(val_subset, batch_size=32, shuffle=False, num_workers=2)
  • 如果有包没有下载,可以在对应虚拟环境中进行下载
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 查看conda虚拟环境列表
conda env list
# 激活
conda activate pytorch
# 查看依赖版本,以免版本冲突
pip install
# 下载合适的版本 sklearn需要考虑numpy和SciPy
pip install scikit-learn==1.2.0
# 查看版本 pip install也可以查看
import numpy as np
import scipy
import sklearn
print(f"NumPy version: {np.__version__}")
print(f"SciPy version: {scipy.__version__}")
print(f"scikit-learn version: {sklearn.__version__}")

网络配置与网络构建

  • 采用预训练的resnet模型,微调

nn.Linear

  • in_features: 上一层输出的特征数量,即这一层输入特征的数量。
  • out_features: 该层输出特征的数量,也就是这一层神经元的数量。
  • bias: 默认为True,每个输出节点加上的一个固定偏移量。
1
2
3
4
5
6
7
8
9
10
11
12
13
import torch.nn as nn
import torch.optim as optim
from torchvision import models

# 使用预训练的ResNet18
net = models.resnet18(pretrained=True)

# 修改最后一层以适应CIFAR-10的分类(10类)
num_ftrs = net.fc.in_features
net.fc = nn.Linear(num_ftrs, 10) # CIFAR-10有10个分类

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net.to(device)

定义损失函数与优化函数

CrossEntropyLoss()

  • 交叉熵损失函数计算的是模型输出(经过softmax函数转换为概率分布)与实际标签的交叉熵,目的是将模型预测的概率分布尽可能接近真实的分布。这种损失函数特别适合用于分类任务中,因为它同时考虑了所有类别的预测概率。在使用nn.CrossEntropyLoss时,模型最后一层的输出不应该通过softmax层,因为nn.CrossEntropyLoss内部会对输出进行softmax处理。

SGD()

  • 随机梯度下降优化器。根据模型反向传播的梯度信息更新模型的权重,以减少损失函数的值。net.parameters()为要优化的参数,lr学习率,momentum动量,是一个用来加速SGD在相关方向上前进,并抑制震荡的参数。动量通过在梯度下降过程中累积过去梯度的一定比例,来影响当前步骤的更新方向。
1
2
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(),lr=0.001,momentum=0.9)

训练

torch.utils.data.DataLoader

  • 在一个训练循环中遍历trainloader时,每次迭代会返回一批数据。这批数据通常是一个包含两个元素的元组:数据(Inputs):一批特定大小(这里是32)的输入数据,通常是一个张量(Tensor)。这个张量的形状通常是(batch_size, channels, height, width),对于非图像数据,形状可能不同。标签(Labels):与这批数据相对应的标签或目标,也是一个张量。对于分类任务,标签张量的形状通常是(batch_size,),即每个样本有一个标签。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def train_model(net, trainloader, criterion, optimizer, num_epochs=10):
net.train() # 设置模型为训练模式
for epoch in range(num_epochs):
running_loss = 0.0 # 用于积累该轮epoch的损失
# i为当前批次索引,data为当前批次输入数据和标签
for i, data in enumerate(trainloader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)

optimizer.zero_grad() # 权重更新前,梯度清零
outputs = net(inputs) # 前向传播,计算当前批次输出
loss = criterion(outputs, labels) # 计算损失
loss.backward() # 根据损失函数计算梯度
optimizer.step() # 根据梯度更新模型参数

running_loss += loss.item()

# 每100个批次打印平均损失,并重置该轮epoch损失
if i % 100 == 99:
print(f'[Epoch {epoch + 1}, Batch {i + 1}] loss: {running_loss / 100:.3f}')
running_loss = 0.0

print('Finished Training')

# 开始训练
train_model(net, trainloader, criterion, optimizer, num_epochs=10)

验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def validate_model(net, valloader, criterion):
net.eval() # 设置模型为评估模式
val_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
for data in valloader:
images, labels = data
images, labels = images.to(device), labels.to(device)

outputs = net(images)
loss = criterion(outputs, labels)
val_loss += loss.item()

_, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

print(f'Validation Loss: {val_loss / len(valloader):.3f}')
print(f'Validation Accuracy: {100 * correct / total:.2f}%')

# 进行验证
validate_model(net, valloader, criterion)
# Validation Loss: 0.163
# Validation Accuracy: 95.40%

Pytorch - ResNet - 自建网络层

导库

1
2
3
4
5
6
7
8
9
10
11
12
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, utils
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data.dataset import Dataset
from torchvision.transforms import transforms
from pathlib import Path
from PIL import Image
import torch.nn.functional as F

下载数据、预处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
transform = transforms.Compose(
[transforms.Resize((224,224)),
ToTensor(),
transforms.Normalize(
mean=[0.5,0.5,0.5],
std=[0.5,0.5,0.5])]
)

training_data = datasets.CIFAR10(
root="data",
train=True,
download=True,
transform=transform,
)

testing_data = datasets.CIFAR10(
root="data",
train=False,
download=True,
transform=transform,
)

定义网络

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class BasicBlock(nn.Module):
def __init__(self,in_channels,out_channels,stride=[1,1],padding=1) -> None:
super(BasicBlock, self).__init__()
# 残差部分
self.layer = nn.Sequential(
nn.Conv2d(in_channels,out_channels,kernel_size=3,stride=stride[0],padding=padding,bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True), # 原地替换 节省内存开销
nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride[1],padding=padding,bias=False),
nn.BatchNorm2d(out_channels)
)

# shortcut 部分
# 由于存在维度不一致的情况 所以分情况
self.shortcut = nn.Sequential()
if stride != 1 or in_channels != out_channels:
self.shortcut = nn.Sequential(
# 卷积核为1 进行升降维
nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride[0], bias=False),
nn.BatchNorm2d(out_channels)
)

def forward(self, x):
out = self.layer(x)
out += self.shortcut(x)
out = F.relu(out)
return out
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 采用bn的网络中,卷积层的输出并不加偏置
class ResNet18(nn.Module):
def __init__(self, BasicBlock, num_classes=10) -> None:
super(ResNet18, self).__init__()
self.in_channels = 64
# 第一层作为单独的 因为没有残差快
self.conv1 = nn.Sequential(
nn.Conv2d(3,64,kernel_size=7,stride=2,padding=3,bias=False),
nn.BatchNorm2d(64),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
# conv2_x
self.conv2 = self._make_layer(BasicBlock,64,[[1,1],[1,1]])

# conv3_x
self.conv3 = self._make_layer(BasicBlock,128,[[2,1],[1,1]])

# conv4_x
self.conv4 = self._make_layer(BasicBlock,256,[[2,1],[1,1]])

# conv5_x
self.conv5 = self._make_layer(BasicBlock,512,[[2,1],[1,1]])

self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512, num_classes)

#这个函数主要是用来,重复同一个残差块
def _make_layer(self, block, out_channels, strides):
layers = []
for stride in strides:
layers.append(block(self.in_channels, out_channels, stride))
self.in_channels = out_channels
return nn.Sequential(*layers)

def forward(self, x):
out = self.conv1(x)
out = self.conv2(out)
out = self.conv3(out)
out = self.conv4(out)
out = self.conv5(out)

out = self.avgpool(out)
out = out.reshape(x.shape[0], -1)
out = self.fc(out)
return out

设定参数、查看图像数据

1
2
3
batch_size=100
train_data = DataLoader(dataset=training_data,batch_size=batch_size,shuffle=True,drop_last=True)
test_data = DataLoader(dataset=testing_data,batch_size=batch_size,shuffle=True,drop_last=True)
1
2
3
4
images,labels = next(iter(train_data))
print(images.shape)
# torch.Size([100, 3, 224, 224])
# 参考torch.utils.data.DataLoader,返回结果为batch_size,channels,height,width
1
2
3
4
5
6
7
img = utils.make_grid(images)
img = img.numpy().transpose(1,2,0)
mean=[0.5,0.5,0.5]
std=[0.5,0.5,0.5]
img = img * std + mean
print([labels[i] for i in range(64)])
plt.imshow(img)

image-20240608123023647

初始化网络、加载到GPU上、定义损失函数&优化器

1
2
3
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"训练设备为{device}")
# 训练设备为cuda:0
1
2
3
res18 = ResNet18(BasicBlock)
print(res18)
# 查看网络结构
1
2
3
4
5
6
7
model = res18.to(device)

cost = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

print(len(train_data)) # 500
print(len(test_data)) # 100

训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
epochs = 10
for epoch in range(epochs):
running_loss = 0.0
running_correct = 0.0
model.train()
print("Epoch {}/{}".format(epoch+1,epochs))
print("-"*10)
for i,data in enumerate(train_data):
X_train,y_train = data
X_train,y_train = X_train.to(device), y_train.to(device)
outputs = model(X_train)
_,pred = torch.max(outputs.data,1)
optimizer.zero_grad()
loss = cost(outputs,y_train)

loss.backward()
optimizer.step()
running_loss += loss.item()
running_correct += torch.sum(pred == y_train.data)

if i % 100 == 99:
print(f"[Batch{i + 1}] loss:{running_loss / 100:.3f}")
running_loss = 0.0

testing_correct = 0
test_loss = 0
model.eval()
for X_test,y_test in test_data:
# X_test,y_test = torch.autograd.Variable(X_test),torch.autograd.Variable(y_test)
X_test,y_test = X_test.to(device), y_test.to(device)
outputs = model(X_test)
loss = cost(outputs,y_test)
_,pred = torch.max(outputs.data,1)
testing_correct += torch.sum(pred == y_test.data)
test_loss += loss.item()
print("Train Loss is:{:.4f}, Train Accuracy is:{:.4f}%, Test Loss is::{:.4f} Test Accuracy is:{:.4f}%".format(
running_loss/len(training_data), 100*running_correct/len(training_data),
test_loss/len(testing_data),
100*testing_correct/len(testing_data)
))

image-20240608123958636

可视化预测结果

  • test_data:DataLoader随机选取图片,对比True Label和Predicted Label
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import matplotlib.pyplot as plt
import numpy as np
import random
classes = ('plane', 'car', 'bird', 'cat', 'deer','dog', 'frog', 'horse', 'ship', 'truck')
# 函数:从testloader中随机取一张图片,并通过模型进行预测
def visualize_prediction(model, testloader):
# 从testloader中随机取一张图片
images, labels = next(iter(testloader))
print(f'待预测图像尺寸为',images.shape)

# 取出一张图片
r = random.randint(0,99)
img = images[r]
label = labels[r]

# 将图片输入模型进行预测
model.eval()
with torch.no_grad():
img = img.to(device)
outputs = model(img.unsqueeze(0))
_, predicted = torch.max(outputs, 1)

# 将图片从Tensor转换为numpy数组
img = img.cpu().numpy()
img = np.transpose(img, (1, 2, 0))
img = img * 0.5 + 0.5 # 反归一化

# 显示图片和预测结果
plt.imshow(img)
plt.title(f"True: {classes[label]}, Predicted: {classes[predicted.item()]}")
plt.show()
1
2
# 调用函数进行可视化
visualize_prediction(model, test_data)

image-20240608123636195


参考:【ResNet】Pytorch从零构建ResNet18