3.FaceNet
有关FaceNet与triplet loss的理论知识请同学们复习理论课有关章节。在这里,我们将用triplet loss训练一个resnet18网络,并用这个网络在mnist数据集上进行KNN分类,具体的,resnet18相当于一个特征提取器,用所有的训练集图片的特征拟合一个KNN分类器,利用这个KNN分类进行预测. 在3.1小节,将给出triplet loss的实现. 3.2小节将实现一个适用于triplet loss训练的resnet18网络. 3.3小节将实现随机选取triplet的dataset, 3.4、3.5小节将分别实现resnet18的训练与测试函数.
embedding size
FaceNet 的作用是将图像嵌入一个d维的空间,在这个d维空间里,同一类图像的特征之间相隔的近,不同类图像的特征之间相隔的远,这个d我们称之为embedding size
3.1 triplet loss
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import os
import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable,Function
class PairwiseDistance(Function):
'''
compute distance of the embedding features, p is norm, when p is 2, then return L2-norm distance
'''
def __init__(self, p):
super(PairwiseDistance, self).__init__()
self.norm = p
def forward(self, x1, x2):
eps = 1e-6 # in case of zeros
diff = torch.abs(x1 - x2) # subtraction
out = torch.pow(diff, self.norm).sum(dim=1) # square
return torch.pow(out + eps, 1. / self.norm) # L-p norm
class TripletLoss(Function):
'''
Triplet loss function.
loss = max(diatance(a,p) - distance(a,n) + margin, 0)
forward method:
args:
anchor, positive, negative
return:
triplet loss
'''
def __init__(self, margin, num_classes=10):
super(TripletLoss, self).__init__()
self.margin = margin
self.num_classes = num_classes
self.pdist = PairwiseDistance(2) # to calculate distance
def forward(self, anchor, positive, negative):
d_p = self.pdist.forward(anchor, positive) # distance of anchor and positive
d_n = self.pdist.forward(anchor, negative) # distance of anchor and negative
dist_hinge = torch.clamp(self.margin + d_p - d_n, min=0.0) # ensure loss is no less than zero
loss = torch.mean(dist_hinge)
return loss
3.2 resnet-18 for triplet loss
class BasicBlock(nn.Module):
'''
resnet basic block.
one block includes two conv layer and one residual
'''
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(self.expansion*planes)
)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNetTriplet(nn.Module):
def __init__(self, block, num_blocks, embedding_size=256, num_classes=10):
super(ResNetTriplet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# feature map size 32x32
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
# feature map size 32x32
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
# feature map size 16x16
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
# feature map size 8x8
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
# feature map size 4x4
# as we use resnet basic block, the expansion is 1
self.linear = nn.Linear(512*block.expansion, embedding_size)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1]*(num_blocks-1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def l2_norm(self,input):
input_size = input.size()
buffer = torch.pow(input, 2)
normp = torch.sum(buffer, 1).add_(1e-10)
norm = torch.sqrt(normp)
_output = torch.div(input, norm.view(-1, 1).expand_as(input))
output = _output.view(input_size)
return output
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
# normalize the features, then we set margin easily
self.features = self.l2_norm(out)
# multiply by alpha = 10 as suggested in https://arxiv.org/pdf/1703.09507.pdf
alpha = 10
self.features = self.features * alpha
# here we get the 256-d features, next we use those features to make prediction
return self.features
def ResNet18(embedding_size=256, num_classes=10):
return ResNetTriplet(BasicBlock, [2,2,2,2], embedding_size, num_classes)
3.3 triplet dataloader
Question 3
仔细阅读下面代码,对pic_classes的作用进行思考,回答下面问题:下面选取triplet的方式是随机选取,若要改为选择指定类别选取,怎么修改?请写出修改后的两行代码。
Answer
Pic_classes
is convenient for us to find the path of the image id in train.csv
and test.csv
. For example, the corresponding content of pic_classes['0']
is a list of ids of image with a number of 0.
In order to produce a triplet sample, we need to take two pictures from the category of pos_class (one anc and one pos) and one picture from neg_class.
# you need to assign value to selected_pos(neg)_class first
pos_class = selected_pos_class
neg_class = selected_neg_class
import numpy as np
import pandas as pd
import torch
from PIL import Image
from torch.utils.data import Dataset
class TripletFaceDataset(Dataset):
def __init__(self, root_dir, csv_name, num_triplets, transform = None):
'''
randomly select triplet,which means anchor,positive and negative are all selected randomly.
args:
root_dir : dir of data set
csv_name : dir of train.csv
num_triplets: total number of triplets
'''
self.root_dir = root_dir
self.df = pd.read_csv(csv_name)
self.num_triplets = num_triplets
self.transform = transform
self.training_triplets = self.generate_triplets(self.df, self.num_triplets)
@staticmethod
def generate_triplets(df, num_triplets):
def make_dictionary_for_pic_class(df):
'''
make csv to the format that we want
- pic_classes = {'class0': [class0_id0, ...], 'class1': [class1_id0, ...], ...}
'''
pic_classes = dict()
for idx, label in enumerate(df['class']):
if label not in pic_classes:
pic_classes[label] = []
pic_classes[label].append(df.iloc[idx, 0])
return pic_classes
triplets = []
classes = df['class'].unique()
pic_classes = make_dictionary_for_pic_class(df)
for _ in range(num_triplets):
'''
- randomly choose anchor, positive and negative images for triplet loss
- anchor and positive images in pos_class
- negative image in neg_class
- at least, two images needed for anchor and positive images in pos_class
- negative image should have different class as anchor and positive images by definition
'''
pos_class = np.random.choice(classes) # random choose positive class
neg_class = np.random.choice(classes) # random choose negative class
# if choose anchor
while len(pic_classes[pos_class]) < 2:
pos_class = np.random.choice(classes)
# if neg in the same class as anchor and pos
while pos_class == neg_class:
neg_class = np.random.choice(classes)
pos_name = df.loc[df['class'] == pos_class, 'name'].values[0] # get positive class's name
neg_name = df.loc[df['class'] == neg_class, 'name'].values[0] # get negative class's name
if len(pic_classes[pos_class]) == 2:
ianc, ipos = np.random.choice(2, size = 2, replace = False)
else:
# both anchor and positive images are in pos_class but are not the same image
ianc = np.random.randint(0, len(pic_classes[pos_class])) # random choose anchor
ipos = np.random.randint(0, len(pic_classes[pos_class])) # random choose positive
while ianc == ipos:
ipos = np.random.randint(0, len(pic_classes[pos_class]))
ineg = np.random.randint(0, len(pic_classes[neg_class])) # random choose negative
triplets.append([pic_classes[pos_class][ianc], pic_classes[pos_class][ipos], pic_classes[neg_class][ineg],
pos_class, neg_class, pos_name, neg_name])
return triplets
def __getitem__(self, idx):
anc_id, pos_id, neg_id, pos_class, neg_class, pos_name, neg_name = self.training_triplets[idx]
anc_img = os.path.join(self.root_dir, str(pos_name), str(anc_id) + '.png') # join the path of anchor
pos_img = os.path.join(self.root_dir, str(pos_name), str(pos_id) + '.png') # join the path of positive
neg_img = os.path.join(self.root_dir, str(neg_name), str(neg_id) + '.png') # join the path of nagetive
anc_img = Image.open(anc_img).convert('RGB') # open the anchor image
pos_img = Image.open(pos_img).convert('RGB') # open the positive image
neg_img = Image.open(neg_img).convert('RGB') # open the negative image
pos_class = torch.from_numpy(np.array([pos_class]).astype('long')) # make label transform the type we want
neg_class = torch.from_numpy(np.array([neg_class]).astype('long')) # make label transform the type we want
data = [anc_img, pos_img,neg_img]
label = [pos_class, pos_class, neg_class]
if self.transform:
data = [self.transform(img) # preprocessing the image
for img in data]
return data, label
def __len__(self):
return len(self.training_triplets)
3.4 train function for triplet loss
import torchvision.transforms as transforms
def train_facenet(epoch, model, optimizer, margin, num_triplets):
model.train()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.CenterCrop(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.4914, 0.4822, 0.4465]),
std=np.array([0.2023, 0.1994, 0.2010])),
])
# get dataset of triplet
# num_triplet is adjustable
train_set = TripletFaceDataset(root_dir = './mnist/train',
csv_name = './mnist/train.csv',
num_triplets = num_triplets,
transform = transform)
train_loader = torch.utils.data.DataLoader(train_set,
batch_size = 16,
shuffle = True)
total_loss = 0.0
for batch_idx, (data, target) in enumerate(train_loader):
# load data to gpu
data[0], target[0] = data[0].cuda(device='cuda:1'), target[0].cuda(device='cuda:1') # anchor to cuda
data[1], target[1] = data[1].cuda(device='cuda:1'), target[1].cuda(device='cuda:1') # positive to cuda
data[2], target[2] = data[2].cuda(device='cuda:1'), target[2].cuda(device='cuda:1') # negative to cuda
data[0], target[0] = Variable(data[0]), Variable(target[0]) # anchor
data[1], target[1] = Variable(data[1]), Variable(target[1]) # positive
data[2], target[2] = Variable(data[2]), Variable(target[2]) # negative
# zero setting the grad
optimizer.zero_grad()
# forward
anchor = model.forward(data[0])
positive = model.forward(data[1])
negative = model.forward(data[2])
# margin is adjustable
loss = TripletLoss(margin=margin, num_classes=10).forward(anchor, positive, negative) # get triplet loss
total_loss += loss.item()
# back-propagating
loss.backward()
optimizer.step()
context = 'Train Epoch: {} [{}/{}], Average loss: {:.4f}'.format(
epoch, len(train_loader.dataset), len(train_loader.dataset), total_loss / len(train_loader))
print(context)
3.5 test function for triplet loss
关于如何测试的问题,由于triplet loss训练的resnet18网络没有分类器,这个网络的最后一层的输出是一个维度为embedding_size的向量,我们把它当作由模型提取出的特征,所以利用这个特征来做测试。首先保存下训练集上所有图片的特征和标签,用sklearn库的KNeighborsClassifier()拟合成一个KNN分类器,这里的K表示领域的个数,K是一个可调节的参数,在测试集上做验证时,提取图片的特征用KNN分类器做预测即可。
Question 4
仔细阅读下面代码,回答问题:下面的预测方法为KNN预测,若要改为中心点预测的方式,即找出每个类别的离均值点最近的图片做最近邻预测,请简述找出中心点的方法,无需写代码。
Answer 4
- First, the model is used to output the one-hot vectors of each type of image - cluster.
- Then, the mean value - center - of each cluster is calculated.
- Images who are relatively closer to the center in the cluster are found as the representatives.
- In the prediction, the similarity between the image and the centers is calculated.
- Finally, we obtain the classification result.
from sklearn import neighbors
import pandas
import matplotlib.pyplot as plt
def KNN_classifier(model, epoch, n_neighbors):
'''
use all train set data to make KNN classifier
'''
model.eval()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
])
# prepare dataset by ImageFolder, data should be classified by directory
train_set = torchvision.datasets.ImageFolder(root='./mnist/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=32, shuffle=False)
features, labels =[], [] # store features and labels
for i, (data, target) in enumerate(train_loader):
# load data to gpu
data, target = data.cuda(device='cuda:1'), target.cuda(device='cuda:1')
data, target = Variable(data), Variable(target)
# forward
output = model(data)
# get features and labels to make knn classifier
features.extend(output.data.cpu().numpy())
labels.extend(target.data.cpu().numpy())
# n_neighbor is adjustable
clf = neighbors.KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(features, labels)
return clf
def find_nearest_image(feature, label, model, clf):
model.eval()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
])
frame = pandas.read_csv('./mnist/train.csv')
# prepare dataset by ImageFolder, data should be classified by directory
train_set = torchvision.datasets.ImageFolder(root = './mnist/train', transform = transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size = 32, shuffle = False)
features = []
targets = []
for i, (data, target) in enumerate(train_loader):
# load data to gpu
data, target = data.cuda(device='cuda:1'), target.cuda(device='cuda:1')
data, target = Variable(data), Variable(target)
# forward
output = model.forward(data)
features.extend(output.data.cpu().numpy())
targets.extend(target.data.cpu().numpy())
min_index = -1
min_dist = 0
for index in range(len(features)):
if targets[index] == label:
dist = np.linalg.norm(feature - features[index])
if min_index == -1:
min_dist = dist
min_index = 0
else:
if dist < min_dist:
min_dist = dist
min_index = index
return os.path.join('./mnist/train/', str(frame['name'][min_index]), str(frame['id'][min_index]) + '.png')
def test_facenet(epoch, model, clf, test = True, last = False):
model.eval()
# preprocessing function for image
transform = transforms.Compose([
transforms.Resize(32),
transforms.ToTensor(),
transforms.Normalize(
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
])
frame = pandas.read_csv('./mnist/test.csv')
# prepare dataset by ImageFolder, data should be classified by directory
test_set = torchvision.datasets.ImageFolder(root = './mnist/test' if test else './mnist/train', transform = transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = 32, shuffle = False)
correct, total = 0, 0
features = []
predicts = []
targets = []
for i, (data, target) in enumerate(test_loader):
# load data to gpu
data, target = data.cuda(device='cuda:1'), target.cuda(device='cuda:1')
data, target = Variable(data), Variable(target)
# forward
output = model.forward(data)
# predict by knn classifier
predicted = clf.predict(output.data.cpu().numpy())
correct += (torch.tensor(predicted) == target.data.cpu()).sum()
total += target.size(0)
if test and last:
features.extend(output.data.cpu().numpy())
targets.extend(target.data.cpu().numpy())
predicts.extend(torch.tensor(predicted).numpy())
if test and last:
err_count = 0
for index in range(len(features)):
if not predicts[index] == targets[index]:
image_path = os.path.join('./mnist/test/', str(frame['name'][index]), str(frame['id'][index]) + '.png')
image = Image.open(image_path).convert('RGB')
plt.subplot(1,3,1)
plt.imshow(image)
# save origin images
error_image_path = os.path.join('./pics/errors/', 'img%d_origin_(%d).png' %(err_count, targets[index]))
image.save(error_image_path)
path_nearest_target = find_nearest_image(features[index], targets[index], model, clf)
path_nearest_predict = find_nearest_image(features[index], predicts[index], model, clf)
image_nearest_target = Image.open(path_nearest_target).convert('RGB')
image_nearest_predict = Image.open(path_nearest_predict).convert('RGB')
plt.subplot(1,3,2)
plt.imshow(image_nearest_target)
plt.subplot(1,3,3)
plt.imshow(image_nearest_predict)
plt.show()
# save nearest taget
target_image_path = os.path.join('./pics/errors/', 'img%d_near_tgt_(%d).png' %(err_count, targets[index]))
image_nearest_target.save(target_image_path)
# save nearest predict
predict_image_path = os.path.join('./pics/errors/', 'img%d_near_pdc_(%d).png' %(err_count, predicts[index]))
image_nearest_predict.save(predict_image_path)
err_count += 1
print("Error images saved!")
context = 'Accuracy of model in ' + ('test' if test else 'train') + \
' set is {}/{}({:.2f}%)'.format(correct, total, 100. * float(correct) / float(total))
print(context)
3.6训练与测试
def run_facenet():
# hyper parameter
lr = 0.008
margin = 2.0
num_triplets = 8000
n_neighbors = 5
embedding_size = 128
num_epochs = 5
# embedding_size is adjustable
model = ResNet18(embedding_size, 10)
# load model into GPU device
device = torch.device('cuda:1')
model = model.to(device)
if device == 'cuda':
model = torch.nn.DataParallel(model)
cudnn.benchmark = True
# define the optimizer, lr、momentum、weight_decay is adjustable
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
print('start training')
for epoch in range(num_epochs):
train_facenet(epoch, model, optimizer, margin, num_triplets) # train resnet18 with triplet loss
clf = KNN_classifier(model, epoch, n_neighbors) # get knn classifier
last = False
if epoch == num_epochs - 1:
last = True
test_facenet(epoch, model, clf, False) # validate train set
test_facenet(epoch, model, clf, True, last) # validate test set
if (epoch + 1) % 4 == 0 :
lr = lr / 3
for param_group in optimizer.param_groups:
param_group['lr'] = lr
run_facenet()
start training
Train Epoch: 0 [8000/8000], Average loss: 0.3779
Accuracy of model in train set is 1899/2000(94.95%)
Accuracy of model in test set is 939/1000(93.90%)
Train Epoch: 1 [8000/8000], Average loss: 0.0987
Accuracy of model in train set is 1967/2000(98.35%)
Accuracy of model in test set is 960/1000(96.00%)
Train Epoch: 2 [8000/8000], Average loss: 0.0527
Accuracy of model in train set is 1970/2000(98.50%)
Accuracy of model in test set is 971/1000(97.10%)
Train Epoch: 3 [8000/8000], Average loss: 0.0376
Accuracy of model in train set is 1983/2000(99.15%)
Accuracy of model in test set is 985/1000(98.50%)
Train Epoch: 4 [8000/8000], Average loss: 0.0165
Accuracy of model in train set is 1994/2000(99.70%)




















Error images saved!
Accuracy of model in test set is 980/1000(98.00%)
Question 5
训练一个较好的resnet18网络,收集在测试集上所有预测错误的样本图片(1000张测试集图片,分错不应超过30张,5%)。并在训练集上找出离这个样本最近的同类样本和错类样本的图片,并作出简要分析(15%)。例如,对于一个样本sample,正确类别为A,模型将其错分为B,分别找出训练集中A类样本和B类样本中离sample最近的样本图片(注意是图片!注意一定要保存在pics文件夹或者自定义文件夹一同提交,否则TA看不到,将图片在下面展示出来)。
hints:重写 test_facenet()函数
hints:根据特征反向寻找图片可参考下列代码. 需保证shuffle=False,train.csv和test.csv均已给出
Answer 5
To collect the errors among the test set, a director named ./pics/errors
serves to save the wrong predictions. test_facenet()
is rewritten as above. Errors are saved in format like img + err_count +origin(near_tgt, near_pdc) + (res)
.
From the results, we can find that most specimen errors occurs when the scripts are too blurry or the digits are too similar. And confusions between 0 and 6, 2 and 3, 3 and 8, 5 and 3 ... are common.
3.7 Hard triplet
Triplet loss的性能与采样方式有很大的关系,这里简述两种hard-triplet的采样方式,batch-hard与semi-hard。
Batch hard
对于每一个minibatch,随机选择P个类,每一类随机挑选K张不同的图片,即一个minibatch有PxK张不同的图片。每一张图片都作为anchor,找出minibatch里面距离anchor最远的正样本和距离最近的负样本,组成一个triplet。loss可表示为:

Semi hard
与batch-hard不同,semi-hard triplet只需要保证minibatch中anchor到positive的距离小于anchor到negative的距离即为semi-hard,见下图,不需要选出minibatch里面距离anchor最远的负样本

Question 6
本次实验是分类任务的最后一次实验,你对分类任务的学习有何感想?
Answer 6
- It's really any onerous work for only a tiny lift!!!
- Sometimes, GPU overcome the computing problem, but this also blind us to some extent. We usually do not attach high importance to the computing difficulties. (
Bad for algorithm innovation???) - The recognition of performances of such complicated NNs are hard to finally verdict, for we lack support from other cases.
- Some intriguing ticks such as triplet and BCE are introduced in the modules, inspires us and motivating us to eplore further at the meantime.
- ResNet and DenseNet are really siblings, they come from one family! And it's interesting to see their deviations.
- From my own perspective, the progression after ResNet is not showing a lot of progress somehow (probably because of the limitation of dataset), so the advancement is still murky for me.
- I really admire the intelligence of the people who come up with the similarity comparing and loss caculating methods, that's really amazing.
作业附加题:
pytorch实现batch-hard或semi-hard的其中一种,重新训练resnet18,对比上面的随机选择triplet的采样方法,其训练过程和结果有何不同,你有更优的方法吗?(不做不扣分,实现一种有较高加分,鼓励同学们挑战高难度)
网友评论