GNN图神经网络
算法使用经典的卷积神经网络,实现对动画片段分为三帧并进行分类。 算法IDE:Pycharm 语言:Python 工具:Pytorch(CPU)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
from torch.utils.data import DataLoader, sampler, Dataset
import torchvision.datasets as dset
import torchvision.transforms as T
import timeit
from PIL import Image
import os
import numpy as np
import scipy.io
import torchvision.models.inception as inception
# 很多数据集都是mat格式的标注信息,使用模块scipy.io的函数loadmat和savemat可以实现Python对mat数据的读写。
label_mat = scipy.io.loadmat('./../data//q3_2_data.mat')
label_train = label_mat['trLb']
print('train_length:', len(label_train))
label_value = label_mat['valLb']
print('val len:', len(label_value))
class ActionDataset(Dataset):
"""Action dataset."""
def __init__(self, root_dir, labels=[], transform=None):
"""
Args:
root_dir (String): 整个数据的路径
labels(list): 图片标签
transform(callable, optional): 想要对数据进行的处理函数
"""
self.root_dir = root_dir
self.transform = transform
self.length = len(os.listdir(root_dir))
self.labels = labels
def __len__(self): # 该方法只需返回数据的数量
return self.length * 3 # 每个视频片段都包含了3帧
def __getitem__(self, idx): # 该方法需要返回一个数据
folder = idx // 3 + 1
imidx = idx % 3 + 1
folder = format(folder, '05d')
imgname = str(imidx) + '.jpg'
img_path = os.path.join(self.root_dir, folder, imgname)
image = Image.open(img_path)
if len(self.labels) != 0:
Label = self.labels[idx // 3][0] - 1
if self.transform: # 如果要先对数据进行预处理,则经过transform函数
image = self.transform(image)
if len(self.labels) != 0:
sample = {'image': image, 'img_path': img_path, 'Label': Label}
else:
sample = {'image': image, 'img_path': img_path}
return sample
image_dataset = ActionDataset(root_dir='./../data//trainClips/', labels=label_train, transform=T.ToTensor())
# torchvision.transforms中定义了非常多对图像的预处理方法,这里使用的ToTensor方法为将0~255的RGB值映射到0~1的Tensor类型。
for i in range(3):
sample = image_dataset[i]
print(sample['image'].shape)
print(sample['Label'])
print(sample['img_path'])
image_dataloader = DataLoader(image_dataset, batch_size=4,
shuffle=True, num_workers=0)
for i, sample in enumerate(image_dataloader):
sample['image'] = sample['image']
print(i, sample['image'].shape, sample['img_path'], sample['Label'])
if i > 5:
break
image_dataset_train = ActionDataset(root_dir='./../data//trainClips//', labels=label_train, transform=T.ToTensor())
image_dataloader_train = DataLoader(image_dataset_train, batch_size=32, shuffle=True, num_workers=0)
image_dataset_val = ActionDataset(root_dir='./..//data//valClips//', labels=label_value, transform=T.ToTensor())
image_dataloader_val = DataLoader(image_dataset_val, batch_size=32, shuffle=False, num_workers=0)
image_dataset_test = ActionDataset(root_dir='./..//data//testClips//', labels=[], transform=T.ToTensor())
image_dataloader_test = DataLoader(image_dataset_test, batch_size=32, shuffle=False, num_workers=0)
dtype = torch.FloatTensor # 这是pytorch所支持的cpu数据类型中的浮点数类型。
print_every = 100 # 这个参数用于控制loss的打印频率,因为我们需要在训练过程中不断的对loss进行检测。
def reset(m): # 这是模型参数的初始化
if hasattr(m, 'reset_parameters'):
m.reset_parameters()
class Flatten(nn.Module):
def forward(self, x):
N, C, H, W = x.size() # 读取各个维度
return x.view(N, -1) # -1代表除了特殊声明过以外的全部维度
# torch.nn.Sequential是一个Sequential容器,模块将按照构造函数中传递的顺序添加到模块中。
fixed_model_base = nn.Sequential(
nn.Conv2d(3, 8, kernel_size=7, stride=1), # 3*64*64 -> 8*58*58
nn.ReLU(inplace=True),
nn.MaxPool2d(2, stride=2), # 8*58*58 -> 8*29*29
nn.Conv2d(8, 16, kernel_size=7, stride=1), # 8*29*29 -> 16*23*23
nn.ReLU(inplace=True),
nn.MaxPool2d(2, stride=2), # 16*23*23 -> 16*11*11
Flatten(),
nn.ReLU(inplace=True),
nn.Linear(1936, 10) # 1936 = 16*11*11
)
# 这里模型base.type()方法是设定模型使用的数据类型,之前设定的cpu的Float类型。
# 如果想要在GPU上训练则需要设定cuda版本的Float类型。
fixed_model = fixed_model_base.type(dtype)
x = torch.randn(32, 3, 64, 64).type(dtype)
x_var = Variable(x.type(dtype)) # 需要将其封装为Variable类型。
ans = fixed_model(x_var)
print(np.array(ans.size())) # 检查模型输出
np.array_equal(np.array(ans.size()), np.array([32, 10]))
def train(model, loss_fn, optimizer, dataloader, num_epochs=1):
for epoch in range(num_epochs):
print('Starting epoch %d / %d' % (epoch + 1, num_epochs))
# 在验证集上验证模型效果
check_accuracy(fixed_model, image_dataloader_val)
model.train() # 模型的.train()方法让模型进入训练状态,参数保留梯度,dropout层等部分正常工作
for t, sample in enumerate(dataloader):
x_var = Variable(sample['image'])
y_var = Variable(sample['Label'].long()) # 取得对应的标签
scores = model(x_var) # 得到输出
loss = loss_fn(scores, y_var) # 计算loss
if (t + 1) % print_every == 0:
print('t = %d, loss = %.4f' % (t + 1, loss.item()))
# 三步更新参数
optimizer.zero_grad()
loss.backward()
optimizer.step()
def check_accuracy(model, loader):
num_correct = 0
num_samples = 0
model.eval() # 模型的.eval()方法切换进入评测模式,对应的dropout等部分将停止工作
for t, sample in enumerate(loader):
x_var = Variable(sample['image'])
y_var = sample['Label']
scores = model(x_var)
_, preds = scores.data.max(1) # 找到可能最高的标签作为输出
num_correct += (preds.numpy() == y_var.numpy()).sum()
num_samples += preds.size(0)
acc = float(num_correct) / num_samples
print('Got %d / %d correct (%.2f)' % (num_correct, num_samples, 100 * acc))
optimizer = torch.optim.RMSprop(fixed_model_base.parameters(), lr=0.0001)
loss_fn = nn.CrossEntropyLoss()
torch.random.manual_seed(54321)
fixed_model.cpu()
fixed_model.apply(reset)
fixed_model.train()
train(fixed_model, loss_fn, optimizer, image_dataloader_train, num_epochs=5)
check_accuracy(fixed_model, image_dataloader_val)
def predict_on_test(model, loader):
num_correct = 0
num_samples = 0
model.eval()
results = open('results.csv', 'w')
count = 0
results.write('Id' + ',' + 'Class' + '\n')
for t, sample in enumerate(loader):
x_var = Variable(sample['image'])
scores = model(x_var)
_, preds = scores.data.max(1)
for i in range(len(preds)):
results.write(str(count) + ',' + str(preds[i]) + '\n')
count += 1
results.close()
return count
count = predict_on_test(fixed_model, image_dataloader_test) # 放入你想要测试的训练集,然后打开文件去看一看结果吧。
print(count)