本文大多来自于 2023 年初学 pytorch 的笔记,后经过整理。
LLM 生成过于方便,导致很多 pytorch 的操作都忘了很多。
经常 Debug 很久才发现是很基础的调包问题。
因此特意整理一些常用的操作作为备忘录。
其中很多内容来自 2023 年看的
小土堆教程
和李沐老师的动手学习深度学习
非常推荐:pytorch 实用教程
tensor 与 array #
相互转化如下:
import numpy as np
import torch
# numpy 转为 tensor
x = np.ones(5)
print(type(x))
x = torch.tensor(x)
print(type(x))
# tensor 转为 numpy
x = torch.ones(5)
x_numpy = x.detach().numpy() # detach() 切断计算图,不再计算梯度,但仍共享存储
一般而言,在 cuda 加速时 tensor 在 GPU 上。
x = torch.ones(5)
x = x.detach().cpu().numpy()
当需要使用到标量 tensor 时(比如统计 loss 等),可以将 tensor 标量化。
loss_value = loss.item()
# 现在可以用 loss_value 进行日志记录或其他 Python 计算
print(f"Loss: {loss_value}")
当然,tensor 和 array 的转化其实没那么简单,比如经常遇到一些类型问题。
此外,数据从 GPU 到 CPU 转化往往会占用一定的性能,因此能在 tensor 上完成的操作尽量转成 array 再操作。
PyTorch 操作 | NumPy 操作 | 说明 |
---|---|---|
tensor.clamp() | np.clip() | 将值限制在指定范围内 |
torch.cat() | np.concatenate() | 沿指定维度连接数组 |
tensor.view() | array.reshape() | 改变数组形状,不改变数据 |
tensor.permute() | np.transpose() | 维度重排 |
tensor.unsqueeze() | np.expand_dims() | 增加维度 |
tensor.repeat() | np.tile() | 重复数组 |
tensor.t() | array.T | 2D张量的转置 |
dataset 和 dataloader #
实例化 Dataset 作用是告诉程序数据集在哪 并进行转化
其中 root 表示数据集本地位置,train 表示是否训练,download 表示在线下载
使用 transforms 中的 Compose 并结合 ToTensor
在载入数据中可以直接进行转化
# 此节主要了解部分标准数据集的使用方法
import torchvision
from torch.utils.tensorboard import SummaryWriter
# 下面开始配合 transform
dataset_transform = torchvision.transforms.Compose([
torchvision.transforms.ToTensor()
])
# 下面演示 CIFAR10 数据集
# 其中 root 表示数据集本地位置,train 表示训练,download 表示在线下载
train_set = torchvision.datasets.CIFAR10(root="./dataset", train=True, transform=dataset_transform, download=True)
test_set = torchvision.datasets.CIFAR10(root="./dataset", train=False, transform=dataset_transform, download=True)
# 可知数据集格式为 PIL 和 target
print(test_set[0])
print(test_set.classes) # 显示数据集的分类
# 获取数据 体会数据类型
img, target = test_set[0]
print(img)
print(target)
print(test_set.classes[target])
writer = SummaryWriter("logs")
for i in range(10):
img, target = test_set[i]
writer.add_image("test_img", img, i)
相比 DataSet 其更像是一个分拣器,将数据集分成 n 个一组
可以将 DataSet 理解为牌堆,将 DataLoader 理解为手牌
DataLoader 也同样有着较多参数,下面进行介绍
import torchvision
from torch.utils.tensorboard import SummaryWriter
# datasets 实例化过程和上一节相同
dataset_transform = torchvision.transforms.Compose([
torchvision.transforms.ToTensor()
])
train_set = torchvision.datasets.CIFAR10(root="./dataset", train=True, transform=dataset_transform)
test_set = torchvision.datasets.CIFAR10(root="./dataset", train=False, transform=dataset_transform)
from torch.utils.data import DataLoader
# dataset:输入已经实例化的数据集
# batch_size=4:这指定了每个批次中包含的样本数量,模型将会根据每个批次的数据计算梯度并更新模型参数
# shuffle=True:DataLoader 是否在每个 epoch 开始时打乱数据集,以增加训练的随机性
# 1. iteration:1个iteration等于使用batchsize个样本训练一次;
# 2. epoch:1个epoch等于使用训练集中的全部样本训练一次;
# num_workers=0:这是指定用于数据加载的子进程数量。默认值为 0,表示数据将在主进程中加载,增加 num_workers 可以加速数据加载
# drop_last=False:当数据集的样本数量不能被 batch_size 整除时,如果设置为 True,最后一个不完整的批次会被丢弃;如果设置为 False,最后一个不完整的批次仍然会被保留
test_loader = DataLoader(dataset=test_set, batch_size=4, shuffle=True, num_workers=0, drop_last=False)
# 由于 batch_size 为四,则进行四个一组打包
for i, data in enumerate(test_loader):
if i > 5:
break
imgs, targets = data
print("---------")
print(len(imgs))
print(targets)
writer = SummaryWriter("logs")
# 由于 batch_size 为四,则进行四个一组打包
for epoch in range(3):
for i, (imgs, targets) in enumerate(test_loader):
writer.add_images("epoch:{}".format(epoch), imgs, i)
writer.close()
tensor transforms
# 注意 此处只是导入 vision 的 transforms
# 还有其他领域 如 text 的 transforms
from torchvision import transforms
from torch.utils.tensorboard import SummaryWriter
from PIL import Image
writer = SummaryWriter("tensor_logs")
# 先学习 tensor (张量) 的数据类型
image_path = "data/reality_data/train/ants_image/0013035.jpg"
img = Image.open(image_path)
print(type(img))
# 将变化工具实例化
tensor_trans = transforms.ToTensor()
print(type(tensor_trans))
# 用 tensor_trans 变化 img
tensor_img = tensor_trans(img)
print(type(tensor_img))
# 可以看到 tensor 本质上是多维数组 可以理解为向量矩阵
print(tensor_img)
# 下面结合 TensorBoard 表示
writer.add_image("tensor", tensor_img)
# 下面演示 Normalize 设置均方和标准差都是 1/2
trans_norm = transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
print(trans_norm)
img_norm = trans_norm(tensor_img)
print(img_norm)
writer.add_image("Normalize", img_norm)
# 下面演示 Resize
# 要注意 Resize 传入的是 PIL,而非 tensor
# 其参数传入一个代表正方形 多个表示长宽
print(img.size)
trans_resize = transforms.Resize((512, 512))
img_resize = trans_resize(img)
print(img_resize.size)
# 下一步将新图像转化为张量
img_resize = tensor_trans(img_resize)
print(img_resize)
writer.add_image("Resize", img_resize)
# Compose - resize - 2
# Compose 实际上就是组合功能 下面组合 Resize 和 ToTensor
trans_resize_2 = transforms.Resize(512)
# 前面函数的输出要匹配后面函数的输入
trans_compose = transforms.Compose([trans_resize_2, tensor_trans])
img_resize_2 = trans_compose(img)
writer.add_image("Resize", img_resize_2, 1)
# RandomCrop 随机裁剪
trans_random = transforms.RandomCrop((128, 256))
trans_compose2 = transforms.Compose([trans_random, tensor_trans])
for i in range(10):
img_crop = trans_compose2(img)
writer.add_image("RandomCrop", img_crop, i)
writer.close()
tensorboard #
- TensorBoard 是 TensorFlow 中强大的可视化工具,支持标量、文本、图像、音频、视频
- TensorBoard 可以为下面 Transforms 的使用铺路
在控制台输入 打开网页即可查看可视化结果
其中 logs 为名字 需要匹配 SummaryWriter("")
tensorboard --logdir=logs
tensorboard --logdir=logs --port=6007
第二个为指定端口号
当进行更新时 只需要刷新TB网页即可
from torch.utils.tensorboard import SummaryWriter
import numpy as np
from PIL import Image
writer = SummaryWriter("logs1")
image_path = "data/reality_data/train/ants_image/0013035.jpg"
img_PIL = Image.open(image_path)
img_array = np.array(img_PIL)
print(type(img_array))
# shape 得到的是高度 宽度 通道
# 通道是图像的属性 指的是 RGB
print(img_array.shape)
# 需要导入的图像是 numpy 类型的
# 需要指定 img_array 的 shape 类型为 H (height) W (weight) C (channel)
writer.add_image("test", img_array, 1, dataformats='HWC')
image_path = "data/reality_data/train/bees_image/16838648_415acd9e3f.jpg"
img_PIL = Image.open(image_path)
img_array = np.array(img_PIL)
print(type(img_array))
writer.add_image("test", img_array, 2, dataformats='HWC')
# 下面测试添加函数图像
# 要注意 TB 只支持一个函数一次拟合 多次 add_scalar 不会清空之前的数据
for i in range(100):
writer.add_scalar("y=2x", -i, i)
for i in range(1, 100):
writer.add_scalar("y=1/x", 1 / i, i)
writer.close()
经典 MLP #
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
import numpy as np
# 设置随机种子
torch.manual_seed(42)
# 1. 定义MLP模型
class MLP(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(MLP, self).__init__()
# 定义网络层
self.layer1 = nn.Linear(input_size, hidden_size)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
# 前向传播
x = self.layer1(x)
x = self.relu(x)
x = self.layer2(x)
return x
# 2. 生成一些示例数据
x_data = torch.linspace(-5, 5, 200).reshape(-1, 1)
y_data = 2 * x_data + torch.randn_like(x_data) * 0.5
# 3. 创建数据集和数据加载器
dataset = TensorDataset(x_data, y_data)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# 4. 初始化模型、损失函数和优化器
input_size = 1
hidden_size = 10
output_size = 1
model = MLP(input_size, hidden_size, output_size)
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 5. 训练循环
num_epochs = 100
loss_history = []
for epoch in range(num_epochs):
epoch_loss = 0.0
for batch_x, batch_y in dataloader:
# 前向传播
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad() # 清除梯度
loss.backward() # 计算梯度
optimizer.step() # 更新参数
epoch_loss += loss.item()
# 记录每个epoch的平均损失
avg_loss = epoch_loss / len(dataloader)
loss_history.append(avg_loss)
# 每10个epoch打印一次损失
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
# 6. 评估模型
model.eval()
with torch.no_grad():
predictions = model(x_data)
# 7. 可视化结果
plt.figure(figsize=(12, 5))
# 绘制损失曲线
plt.subplot(1, 2, 1)
plt.plot(loss_history)
plt.title('Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
# 绘制预测结果
plt.subplot(1, 2, 2)
plt.scatter(x_data.numpy(), y_data.numpy(), label='Data', alpha=0.4)
plt.plot(x_data.numpy(), predictions.numpy(), 'r', linewidth=2, label='Prediction')
plt.title('Model Prediction')
plt.xlabel('x')
plt.ylabel('y')
plt.legend()
plt.tight_layout()
plt.show()
# 打印一些模型参数
print("Model parameters:")
for name, param in model.named_parameters():
print(f"{name}: {param.data}")
经典 CNN #
卷积层 #
- 不在此处写卷积原理了
- 一般在初始化中定义好卷积函数 conv1, conv2
- 一般调用 Conv2d 包,其他很少使用
- 卷积核由种子随机生成,在后续的反向传播中调整
Conv2d 使用方法
-
in_channels 指输入通道 彩色图一般为 3
-
out_channels 是输出通道 指的是用n个卷积核得到n个结果通道
-
kernel_size 是卷积核的大小
-
stride 是步长 默认为一
-
padding 指是否填充原图像边缘 默认填充零
池化层 #
-
常用最大池化,即在一定范围矩阵内取最大值
-
主要目的是降低数据维度
-
常用 MaxPool2d 其参数大部分和卷积相同
-
ceil_mode 当为True时,其输出大小为向上取整,简单来说,和 padding 类似,不足剩余卷积核的部分是否选最大值出
import torch
import torchvision
from torch import nn, reshape, flatten
from torch.nn import Conv2d, MaxPool2d, ReLU, Linear, Flatten
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
# 之前解释过的 dataset 和 dataloader
dataset = torchvision.datasets.CIFAR10("./dataset", train=False, transform=torchvision.transforms.ToTensor())
dataloader = DataLoader(dataset, batch_size=64)
class MY_NN(nn.Module):
def __init__(self):
super(MY_NN, self).__init__()
# 第一个卷积卷积函数
# in_channels 指输入通道 彩色图一般为 3
# out_channels 是输出通道 指的是用n个卷积核得到n和结果通道
# kernel_size 是卷积核的大小
# stride 是步长 默认为一
# padding 指是否填充原图像边缘 默认填充零
self.conv1 = Conv2d(in_channels=3, out_channels=6, kernel_size=3, stride=1, padding=0)
# 向前传播进行一层卷积
def forward(self, x):
x = self.conv1(x)
return x
my_nn = MY_NN()
writer = SummaryWriter("logs")
for step, (imgs, targets) in enumerate(dataloader):
if step == 0:
print(imgs.shape)
# 注意此处是 add_images
writer.add_images("input", imgs, step)
output = my_nn(imgs)
if step == 0:
print(output.shape)
# 此处的 output 是六通道 无法直接输出
# 强制转化为三通道或一通道进行输出
# -1 表示自动判断批处理大小
output = reshape(output, (-1, 3, 30, 30))
writer.add_images("output_conv", output, step)
# 最大池化层
class MY_NN_Pool(nn.Module):
def __init__(self):
super(MY_NN_Pool, self).__init__()
# kernel_size stride padding 与卷积操作相同
# ceil_mode 当为True时,其输出大小为向上取整,简单来说,和 padding 类似,不足剩余卷积核的部分是否选最大值
self.maxpool1 = MaxPool2d(kernel_size=3, padding=0, ceil_mode=True)
# 向前传播进行一层卷积
def forward(self, x):
x = self.maxpool1(x)
return x
my_nn_pool = MY_NN_Pool()
for step, (imgs, targets) in enumerate(dataloader):
output = my_nn_pool(imgs)
if step == 0:
print(output.shape)
output = reshape(output, (-1, 3, 30, 30))
writer.add_images("output_pool", output, step)
# 激活函数
class MY_NN_Activitions(nn.Module):
def __init__(self):
super(MY_NN_Activitions, self).__init__()
# inplace 为 True 时为浅拷贝,否则为深拷贝
self.relu1 = ReLU(inplace=False)
# 向前传播进行一层卷积
def forward(self, x):
x = self.relu1(x)
return x
class MY_NN_linear(nn.Module):
def __init__(self):
super(MY_NN_linear, self).__init__()
# 注意线性变化是一维的
# in_features 要匹配输入大小,设置 out_features 是指定输出大小,bias 指是否偏置
# 此处的 196608 是为了适配输入
self.linear1 = Linear(in_features=196608, out_features=10, bias=False)
# 向前传播进行一层卷积
def forward(self, x):
# 我的 dataloader 会取不完整的一个 bitch_size 因此会导致最后一个保证
# 因此需要加一个判断 size 的操作
if x.shape == torch.Size([196608]):
x = self.linear1(x)
return x
my_nn_linear = MY_NN_linear()
for i, (imgs, targets) in enumerate(dataloader):
# 由于输入要是一位的 则需要将其拍扁
output = flatten(imgs)
if i == 0:
print(output)
output = my_nn_linear(output)
if i == 0:
print(output)
writer.close()
# 测试上面的方法
class NN_final_test(nn.Module):
def __init__(self):
super(NN_final_test, self).__init__()
self.model1 = nn.Sequential(
Conv2d(3, 32, 5, padding=2),
MaxPool2d(2),
Conv2d(32, 32, 5, padding=2),
MaxPool2d(2),
Conv2d(32, 64, 5, padding=2),
Flatten(),
Linear(4096, 64),
Linear(64, 10),
)
def forward(self, x):
x = self.conv1(x)
x = self.maxpool1(x)
x = self.conv2(x)
x = self.maxpool2(x)
x = self.conv3(x)
x = self.flatten(x)
x = self.liner1(x)
x = self.liner2(x)
return x
nn_final_test = NN_final_test()
print(nn_final_test)
for imgs, targets in dataloader:
print(imgs)
print(nn_final_test.model1(imgs))
break
# 交叉熵损失函数
loss = nn.CrossEntropyLoss()
# 优化器 也就是反向传播算法
# 此处用 SGD 随机梯度下降,传入参数与学习率
optim = torch.optim.SGD(nn_final_test.parameters(), lr=0.01)
# 进行多轮学习
for epoch in range(20):
run_loss = 0 # 计算累计损失率
for imgs, targets in dataloader:
outputs = nn_final_test(imgs)
result_loss = loss(outputs, targets) # 传入输入和目标函数
optim.zero_grad() # 将梯度清零
result_loss.backward() # 重新计算梯度
optim.step() # 执行 SGD
run_loss += result_loss
print(run_loss)