NSDT工具推荐: Three.js AI纹理开发包 - YOLO合成数据生成器 - GLTF/GLB在线编辑 - 3D模型格式在线转换 - 可编程3D场景编辑器 - REVIT导出3D模型插件 - 3D模型语义搜索引擎 - AI模型在线查看 - Three.js虚拟轴心开发包 - 3D模型在线减面 - STL模型在线切割 - 3D道路快速建模
在上一篇文章中我们全面介绍了如何使用 fast.ai 构建自动驾驶卡车模拟器,最终这些方法可以处理任何你需要微调预训练模型或开发预测包围框和分类的情况。
现在我的目标是逐步了解训练和推理过程的一些更技术性的方面,并解释它们如何在 PyTorch 中实现的细节。 你可以参考此 Github 存储库中的代码库。
回想上一篇文章,有两个神经网络在工作。
- 预测转弯方向的 DNN。
- 预测汽车、人等的包围框和类别的 DNN。
1、微调转弯方向模型
两个网络都以预训练的 resnet34 网络开始,并针对适当的任务进行微调。
可以从 torchvision.models
获得预训练的 resnet34:
import torchvision.models as models
arch = models.resnet34(pretrained=True)
所有预训练模型都已在 1000 个分类的 Imagenet 数据集上进行了预训练。
为了微调预训练网络,我们基本上是从一组权重开始,这些权重已经包含了很多关于嵌入其中的 Imagenet 数据集的信息。 所以我们可以用两种方法之一来做到这一点。 一种方法是通过设置 requires_grad=False
来冻结所有早期层,然后只为最后一层设置 requires_grad=True
。 另一种方法只使用所有权重作为初始化值并继续对我们的新训练数据进行训练。
对于冻结早期层并仅训练最终层的选项 1,我们可以为所有层设置 requires_grad=False
,然后删除并替换最后一层。无论何时将层分配给网络,它都会自动将 requires_grad
属性设置为 True 。
class Flatten(nn.Module):
def __init__(self):
super(Flatten, self).__init__()
def forward(self, x):
x = x.view(x.size(0), -1)
return x
class normalize(nn.Module):
def __init__(self):
super(normalize, self).__init__()
def forward(self, x):
x = F.normalize(x, p=2, dim=1)
return x
layer_list = list(arch.children())[-2:]
arch = nn.Sequential(*list(arch.children())[:-2])
arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1,1))
arch.fc = nn.Sequential(
Flatten(),
nn.Linear(in_features=layer_list[1].in_features,
out_features=3,
bias=True),
normalize()
)
arch = arch.to(device)
如果查看 resnet34 的架构,你会发现最后一个 conv
块后跟一个 AdaptiveAvgPool2d
和一个 Linear
层。
我们可以使用 nn.Sequential(*list(arch.children())[:-2])
删除最后两层,然后使用 arch.avgpool = nn.AdaptiveAvgPool2d(output_size=(1 ,1))
和另一个带有 Flatten、Linear 和 normalize 层的 nn.Sequential
。 我们最终想要预测 3 个类别:左、右、直——所以我们的 out_features
将为 3。
现在我们将为方向模型创建数据集和数据加载器。 由于我们的数据只是图像和分类 [left, right, straight]
,我们可以只使用内置的 torch 数据集类,但无论如何我都喜欢使用自定义类,因为我可以准确地看到数据是如何更容易地提取的。
class DirectionsDataset(Dataset):
"""Directions dataset."""
def __init__(self, csv_file, root_dir, transform=None):
"""
Args:
csv_file (string): Path to the csv file with labels.
root_dir (string): Directory with all the images.
transform (callable, optional): Optional transform
"""
self.label = pd.read_csv(csv_file)
self.root_dir = root_dir
self.transform = transform
def __len__(self):
return len(self.label)
def __getitem__(self, idx):
img_name = os.path.join(self.root_dir,
self.label.iloc[idx, 0])
image = io.imread(img_name+'.jpg')
sample = image
label = self.label.iloc[idx, 1]
if self.transform:
sample = self.transform(sample)
return sample, label
在 csv 文件中的图像名称没有扩展名,因此是 img_name+'.jpg'
。
tensor_dataset = DirectionsDataset(csv_file='data/labels_directions.csv',
root_dir='data/train3/',
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(
(0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
dataloader = DataLoader(tensor_dataset, batch_size=16, shuffle=True)
所以我们准备开始训练模型。
def train_model(model, criterion, optimizer, scheduler,
dataloader, num_epochs=25):
since = time.time()
FT_losses = []
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
iters = 0
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
scheduler.step()
model.train() # Set model to training mode
running_loss = 0.0
running_corrects = 0
# Iterate over data.
for i, (inputs, labels) in enumerate(dataloader):
#set_trace()
inputs = inputs.to(device)
labels = labels.to(device)
# zero the parameter gradients
optimizer.zero_grad()
# forward
# track history if only in train
model.eval() # Set model to evaluate mode
with torch.no_grad():
outputs = model(inputs)
#set_trace()
_, preds = torch.max(outputs, 1)
outputs = model(inputs)
loss = criterion(outputs, labels)
# backward + optimize only if in training phase
loss.backward()
optimizer.step()
FT_losses.append(loss.item())
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
#set_trace()
iters += 1
if iters % 2 == 0:
print('Prev Loss: {:.4f} Prev Acc: {:.4f}'.format(
loss.item(), torch.sum(preds == labels.data) / inputs.size(0)))
epoch_loss = running_loss / dataset_size
epoch_acc = running_corrects.double() / dataset_size
print('Loss: {:.4f} Acc: {:.4f}'.format(
epoch_loss, epoch_acc))
# deep copy the model
if epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# load best model weights
model.load_state_dict(best_model_wts)
return model, FT_losses
在这个训练循环中,如果 epoch 准确度是目前为止最好的,我们可以跟踪最佳模型权重。 我们还可以跟踪每次迭代和每个时期的损失,并在最后返回以绘制并查看调试或演示的样子。
请记住,模型在每次迭代时都在接受训练,如果你停止训练循环,它将保留这些权重,只需再次运行 train_model()
命令即可再次继续训练。 要再次从头开始,请返回并使用预训练架构重新初始化权重。
criterion = nn.CrossEntropyLoss()
# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(arch.parameters(), lr=1e-2, momentum=0.9)
# Decay LR by a factor of *gamma* every *step_size* epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
arch, FT_losses = train_model(arch, criterion, optimizer_ft, exp_lr_scheduler, dataloader, num_epochs=5)
2、微调包围框模型
同样,我们将使用预训练的 resnet34 架构。 然而,这一次我们将不得不对其进行更实质性的编辑,以输出类别预测和边界框值。 此外,这是一个多类别预测问题,因此可能有 1 个边界框,也可能有 15 个——因此同时也有 1 个或 15 个类别。
我们将以类似于替换方向模型中的图层的方式为架构创建自定义头。
class StdConv(nn.Module):
def __init__(self, nin, nout, stride=2, drop=0.1):
super().__init__()
self.conv = nn.Conv2d(nin, nout, 3, stride=stride, padding=1)
self.bn = nn.BatchNorm2d(nout)
self.drop = nn.Dropout(drop)
def forward(self, x):
return self.drop(self.bn(F.relu(self.conv(x))))
def flatten_conv(x,k):
bs,nf,gx,gy = x.size()
x = x.permute(0,2,3,1).contiguous()
return x.view(bs,-1,nf//k)
class OutConv(nn.Module):
def __init__(self, k, nin, bias):
super().__init__()
self.k = k
self.oconv1 = nn.Conv2d(nin, (len(id2cat)+1)*k, 3, padding=1)
self.oconv2 = nn.Conv2d(nin, 4*k, 3, padding=1)
self.oconv1.bias.data.zero_().add_(bias)
def forward(self, x):
return [flatten_conv(self.oconv1(x), self.k),
flatten_conv(self.oconv2(x), self.k)]
drop=0.4
class SSD_MultiHead(nn.Module):
def __init__(self, k, bias):
super().__init__()
self.drop = nn.Dropout(drop)
self.sconv0 = StdConv(512,256, stride=1, drop=drop)
self.sconv1 = StdConv(256,256, drop=drop)
self.sconv2 = StdConv(256,256, drop=drop)
self.sconv3 = StdConv(256,256, drop=drop)
self.out0 = OutConv(k, 256, bias)
self.out1 = OutConv(k, 256, bias)
self.out2 = OutConv(k, 256, bias)
self.out3 = OutConv(k, 256, bias)
def forward(self, x):
x = self.drop(F.relu(x))
x = self.sconv0(x)
x = self.sconv1(x)
o1c,o1l = self.out1(x)
x = self.sconv2(x)
o2c,o2l = self.out2(x)
x = self.sconv3(x)
o3c,o3l = self.out3(x)
return [torch.cat([o1c,o2c,o3c], dim=1),
torch.cat([o1l,o2l,o3l], dim=1)]
现在我们需要将这个自定义头连接到 resnet34 架构,有一个方便的函数可以做到这一点:
class ConvnetBuilder():
def __init__(self, f, c, is_multi, is_reg, ps=None,
xtra_fc=None, xtra_cut=0,
custom_head=None,pretrained=True):
self.f,self.c,self.is_multi,self.is_reg,self.xtra_cut = f,c,is_multi,is_reg,xtra_cut
xtra_fc = [512]
ps = [0.25]*len(xtra_fc) + [0.5]
self.ps,self.xtra_fc = ps,xtra_fc
cut,self.lr_cut = [8,6] # specific to resnet_34 arch
cut-=xtra_cut
layers = cut_model(f(pretrained), cut)
self.nf = num_features(layers)*2
self.top_model = nn.Sequential(*layers)
n_fc = len(self.xtra_fc)+1
self.ps = [self.ps]*n_fc
fc_layers = [custom_head]
self.n_fc = len(fc_layers)
self.fc_model = nn.Sequential(*fc_layers).to(device)
self.model = nn.Sequential(*(layers+fc_layers)).to(device)
def cut_model(m, cut):
return list(m.children())[:cut] if cut else [m]
def num_features(m):
c=children(m)
if len(c)==0: return None
for l in reversed(c):
if hasattr(l, 'num_features'): return l.num_features
res = num_features(l)
if res is not None: return res
def children(m): return m if isinstance(m, (list, tuple)) else list(m.children())
使用这个 ConvnetBuilder 类,我们可以结合自定义头和 resnet34 架构。
k = len(anchor_scales)
head_reg4 = SSD_MultiHead(k, -4.)
f_model = models.resnet34
modelss = ConvnetBuilder(f_model, 0, 0, 0, custom_head=head_reg4)
k是9。
我们现在可以通过 models
的 model
属性访问模型。
损失函数必须能够接受分类(类)和连续值(边界框)并输出单个损失值。
def ssd_loss(pred,targ,print_it=False):
lcs,lls = 0.,0.
for b_c,b_bb,bbox,clas in zip(*pred,*targ):
loc_loss,clas_loss = ssd_1_loss(b_c,b_bb,bbox,clas,print_it)
lls += loc_loss
lcs += clas_loss
if print_it:
print(f'loc: {lls.data.item()}, clas: {lcs.data.item()}')
return lls+lcs
def ssd_1_loss(b_c,b_bb,bbox,clas,print_it=False):
bbox,clas = get_y(bbox,clas)
a_ic = actn_to_bb(b_bb, anchors)
overlaps = jaccard(bbox.data, anchor_cnr.data)
gt_overlap,gt_idx = map_to_ground_truth(overlaps,print_it)
gt_clas = clas[gt_idx]
pos = gt_overlap > 0.4
pos_idx = torch.nonzero(pos)[:,0]
gt_clas[1-pos] = len(id2cat)
gt_bbox = bbox[gt_idx]
loc_loss = ((a_ic[pos_idx] - gt_bbox[pos_idx]).abs()).mean()
clas_loss = loss_f(b_c, gt_clas)
return loc_loss, clas_loss
def one_hot_embedding(labels, num_classes):
return torch.eye(num_classes)[labels.data.long().cpu()]
class BCE_Loss(nn.Module):
def __init__(self, num_classes):
super().__init__()
self.num_classes = num_classes
def forward(self, pred, targ):
t = one_hot_embedding(targ, self.num_classes+1)
t = V(t[:,:-1].contiguous()).cpu()
x = pred[:,:-1]
w = self.get_weight(x,t)
return F.binary_cross_entropy_with_logits(x, t, w, size_average=False)/self.num_classes
def get_weight(self,x,t): return None
loss_f = BCE_Loss(len(id2cat))
def get_y(bbox,clas):
bbox = bbox.view(-1,4)/sz
bb_keep = ((bbox[:,2]-bbox[:,0])>0).nonzero()[:,0]
return bbox[bb_keep],clas[bb_keep]
def actn_to_bb(actn, anchors):
actn_bbs = torch.tanh(actn)
actn_centers = (actn_bbs[:,:2]/2 * grid_sizes) + anchors[:,:2]
actn_hw = (actn_bbs[:,2:]/2+1) * anchors[:,2:]
return hw2corners(actn_centers, actn_hw)
def intersect(box_a, box_b):
max_xy = torch.min(box_a[:, None, 2:], box_b[None, :, 2:])
min_xy = torch.max(box_a[:, None, :2], box_b[None, :, :2])
inter = torch.clamp((max_xy - min_xy), min=0)
return inter[:, :, 0] * inter[:, :, 1]
def box_sz(b): return ((b[:, 2]-b[:, 0]) * (b[:, 3]-b[:, 1]))
def jaccard(box_a, box_b):
inter = intersect(box_a, box_b)
union = box_sz(box_a).unsqueeze(1) + box_sz(box_b).unsqueeze(0) - inter
return inter / union
一旦我们设置了数据集和数据加载器,就可以在 bbox 模型的批量输出上测试损失函数。
这里我们实际上需要一个自定义数据集类来处理这些数据类型。
class BboxDataset(Dataset):
"""Bbox dataset."""
def __init__(self, csv_file, root_dir, transform=None):
"""
Args:
csv_file (string): Path to csv file with bounding boxes.
root_dir (string): Directory with all the images.
transform (callable, optional): Optional transform.
"""
self.label = pd.read_csv(csv_file)
self.root_dir = root_dir
self.transform = transform
self.sz = 224
def __len__(self):
return len(self.label)
def __getitem__(self, idx):
img_name = os.path.join(self.root_dir,
self.label.iloc[idx, 0])
image = io.imread(img_name)
sample = image
h, w = sample.shape[:2]; new_h, new_w = (224,224)
bb = np.array([float(x) for x in self.label.iloc[idx, 1].split(' ')], dtype=np.float32)
bb = np.reshape(bb, (int(bb.shape[0]/2),2))
bb = bb * [new_h / h, new_w / w]
bb = bb.flatten()
bb = T(np.concatenate((np.zeros((189*4) - len(bb)), bb), axis=None)) # 189 is 21 * 9 where 9 = k
if self.transform:
sample = self.transform(sample)
return sample, bb
这个自定义数据集类处理边界框,但我们想要一个处理类和边界框的数据集类。
bb_dataset = BboxDataset(csv_file='data/pascal/tmp/mbb.csv',
root_dir='data/pascal/VOCdevkit2/VOC2007/JPEGImages/',
transform=transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((224,224)),
transforms.ToTensor(),
transforms.Normalize(
(0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]))
bb_dataloader = DataLoader(bb_dataset, batch_size=16, shuffle=True)
在这里,我们可以连接两个数据集类,以便为每个图像返回类和边界框。
class ConcatLblDataset(Dataset):
def __init__(self, ds, y2):
self.ds,self.y2 = ds,y2
self.sz = ds.sz
def __len__(self): return len(self.ds)
def __getitem__(self, i):
self.y2[i] = np.concatenate((np.zeros(189 - len(self.y2[i])), self.y2[i]), axis=None)
x,y = self.ds[i]
return (x, (y,self.y2[i]))
trn_ds2 = ConcatLblDataset(bb_dataset, mcs)
其中 mcs
是一个 numpy 数组,其中包含每个训练图像的类。
PATH_pascal = Path('data/pascal')
trn_j = json.load((PATH_pascal / 'pascal_train2007.json').open())
cats = dict((o['id'], o['name']) for o in trn_j['categories'])
mc = [[cats[p[1]] for p in trn_anno[o]] for o in trn_ids]
id2cat = list(cats.values())
cat2id = {v:k for k,v in enumerate(id2cat)}
mcs = np.array([np.array([cat2id[p] for p in o]) for o in mc])
现在我们可以测试自定义损失。
sz=224
x,y = next(iter(bb_dataloader2))
batch = modelss.model(x)
ssd_loss(batch, y, True)
tensor([0.6254])
tensor([0.6821, 0.7257, 0.4922])
tensor([0.9563])
tensor([0.6522, 0.5276, 0.6226])
tensor([0.6811, 0.3338])
tensor([0.7008])
tensor([0.5316, 0.2926])
tensor([0.9422])
tensor([0.5487, 0.7187, 0.3620, 0.1578])
tensor([0.6546, 0.3753, 0.4231, 0.4663, 0.2125, 0.0729])
tensor([0.3756, 0.5085])
tensor([0.2304, 0.1390, 0.0853])
tensor([0.2484])
tensor([0.6419])
tensor([0.5954, 0.5375, 0.5552])
tensor([0.2383])
loc: 1.844399333000183, clas: 79.79206085205078
Out[1024]:
tensor(81.6365, grad_fn=<AddBackward0>)
现在训练ssd模型:
beta1 = 0.5
optimizer = optim.Adam(modelss.model.parameters(), lr=1e-3, betas=(beta1, 0.99))
# Decay LR by a factor of *gamma* every *step_size* epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
我们可以使用与以前基本相同的 train_model()
函数,但这次我们将边界框和类的列表传递给损失函数 ssd_loss()
。
现在我们已经在新的训练数据集上训练了我们的两个模型,准备好使用它们对我们的卡车模拟器游戏进行推理。
玩得开心!
原文链接:Autonomous Truck Simulator with PyTorch — finetuning and single shot detectors
BimAnt翻译整理,转载请标明出处