

论文:Towards Real-Time Multi-Object Tracking


论文针对SED跟踪弊端,提出One-stage 跟踪JED范式。

采用YOLOv3模型,网络结构基本相同,只是在prediction head中多出一个分支用于学习embedding,每个预测头都被建模为多任务学习问题,Prediction head输出大小为(6A+D)HW,其中A是该比例的anchor数量,D是embedding维度,分配如下


跟踪轨迹中的每一个物体都有唯一的track ID,为得到分类信息,在embedding后引入全连接层,借鉴物体分类思路(将每个track ID当做一个类别),将embedding信息转化为track ID的分类信息,论文给的图像中没有说明,补全效果如下





class YOLOLayer(nn.Module):def __init__(self, anchors, nC, nID, nE, img_size, yolo_layer):super(YOLOLayer, self).__init__()self.layer = yolo_layernA = len(anchors)self.anchors = torch.FloatTensor(anchors)self.nA = nA  # number of anchors (3)   注意这里代码和论文都是4self.nC = nC  # number of classes (80)self.nID = nID # number of identitiesself.img_size = 0self.emb_dim = nE self.shift = [1, 3, 5]self.SmoothL1Loss  = nn.SmoothL1Loss()self.SoftmaxLoss = nn.CrossEntropyLoss(ignore_index=-1)self.CrossEntropyLoss = nn.CrossEntropyLoss()self.IDLoss = nn.CrossEntropyLoss(ignore_index=-1)    # 此处为ID loss的损失函数self.s_c = nn.Parameter(-4.15*torch.ones(1))  # -4.15self.s_r = nn.Parameter(-4.85*torch.ones(1))  # -4.85self.s_id = nn.Parameter(-2.3*torch.ones(1))  # -2.3self.emb_scale = math.sqrt(2) * math.log(self.nID-1) if self.nID>1 else 1


    def forward(self, p_cat,  img_size, targets=None, classifier=None, test_emb=False):p, p_emb = p_cat[:, :24, ...], p_cat[:, 24:, ...]   # 前24维用于box和confnB, nGh, nGw = p.shape[0], p.shape[-2], p.shape[-1]     # 个数、sizeif self.img_size != img_size:create_grids(self, img_size, nGh, nGw)if p.is_cuda:self.grid_xy = self.grid_xy.cuda()self.anchor_wh = self.anchor_wh.cuda()p = p.view(nB, self.nA, self.nC + 5, nGh, nGw).permute(0, 1, 3, 4, 2).contiguous()  # prediction  将24维信息分为4个anchor对应的[[box],[conf]],box4位,conf两位p_emb = p_emb.permute(0,2,3,1).contiguous()    # embedding 信息 shape(nB,[size],512)p_box = p[..., :4]                             # boxs 信息 shape(nB,4,[size],4)p_conf = p[..., 4:6].permute(0, 4, 1, 2, 3)  # Conf shape(nB,2,4,[size]) 4为anchor数量


       # Trainingif targets is not None:if test_emb:tconf, tbox, tids = build_targets_max(targets, self.anchor_vec.cuda(), self.nA, self.nC, nGh, nGw)else:tconf, tbox, tids = build_targets_thres(targets, self.anchor_vec.cuda(), self.nA, self.nC, nGh, nGw)tconf, tbox, tids = tconf.cuda(), tbox.cuda(), tids.cuda() # 置信度 检测框 id,每种对应四个anchor有相同的四层mask = tconf > 0


           # Compute lossesnT = sum([len(x) for x in targets])  # number of targetsnM = mask.sum().float()  # number of anchors (assigned to targets)nP = torch.ones_like(mask).sum().float()if nM > 0:lbox = self.SmoothL1Loss(p_box[mask], tbox[mask])else:FT = torch.cuda.FloatTensor if p_conf.is_cuda else torch.FloatTensorlbox, lconf =  FT([0]), FT([0])lconf =  self.SoftmaxLoss(p_conf, tconf)lid = torch.Tensor(1).fill_(0).squeeze().cuda()emb_mask,_ = mask.max(1)


           # For convenience we use max(1) to decide the id, TODO: more reseanable strategytids,_ = tids.max(1) tids = tids[emb_mask]embedding = p_emb[emb_mask].contiguous()embedding = self.emb_scale * F.normalize(embedding)nI = emb_mask.sum().float()if  test_emb:if np.prod(embedding.shape)==0  or np.prod(tids.shape) == 0:return torch.zeros(0, self.emb_dim+1).cuda()emb_and_gt = torch.cat([embedding, tids.float()], dim=1)return emb_and_gtif len(embedding) > 1:logits = classifier(embedding).contiguous()   # 全连接分类lid =  self.IDLoss(logits, tids.squeeze())


 # Sum loss componentsloss = torch.exp(-self.s_r)*lbox + torch.exp(-self.s_c)*lconf + torch.exp(-self.s_id)*lid + \(self.s_r + self.s_c + self.s_id)loss *= 0.5return loss, loss.item(), lbox.item(), lconf.item(), lid.item(), nT


class Darknet(nn.Module):"""YOLOv3 object detection model"""def __init__(self, cfg_dict, nID=0, test_emb=False):super(Darknet, self).__init__()if isinstance(cfg_dict, str):cfg_dict = parse_model_cfg(cfg_dict)self.module_defs = cfg_dict self.module_defs[0]['nID'] = nIDself.img_size = [int(self.module_defs[0]['width']), int(self.module_defs[0]['height'])]self.emb_dim = int(self.module_defs[0]['embedding_dim'])self.hyperparams, self.module_list = create_modules(self.module_defs)self.loss_names = ['loss', 'box', 'conf', 'id', 'nT']self.losses = OrderedDict()for ln in self.loss_names:self.losses[ln] = 0self.test_emb = test_embself.classifier = nn.Linear(self.emb_dim, nID) if nID>0 else None# 这里定义的就是上边embedding分类网络,输出大小为nIDdef forward(self, x, targets=None, targets_len=None):self.losses = OrderedDict()for ln in self.loss_names:self.losses[ln] = 0is_training = (targets is not None) and (not self.test_emb)#img_size = x.shape[-1]layer_outputs = []output = []for i, (module_def, module) in enumerate(zip(self.module_defs, self.module_list)):mtype = module_def['type']if mtype in ['convolutional', 'upsample', 'maxpool']:x = module(x)elif mtype == 'route':layer_i = [int(x) for x in module_def['layers'].split(',')]if len(layer_i) == 1:x = layer_outputs[layer_i[0]]else:x = torch.cat([layer_outputs[i] for i in layer_i], 1)elif mtype == 'shortcut':layer_i = int(module_def['from'])x = layer_outputs[-1] + layer_outputs[layer_i]elif mtype == 'yolo':if is_training:  # get losstargets = [targets[i][:int(l)] for i,l in enumerate(targets_len)]x, *losses = module[0](x, self.img_size, targets, self.classifier)for name, loss in zip(self.loss_names, losses):self.losses[name] += losselif self.test_emb:if targets is not None:targets = [targets[i][:int(l)] for i,l in enumerate(targets_len)]x = module[0](x, self.img_size, targets, self.classifier, self.test_emb)else:  # get detectionsx = module[0](x, self.img_size)output.append(x)layer_outputs.append(x)if is_training:self.losses['nT'] /= 3 output = [o.squeeze() for o in output]return sum(output), torch.Tensor(list(self.losses.values())).cuda()elif self.test_emb:return torch.cat(output, 0)return torch.cat(output, 1)



class JDETracker(object):def __init__(self, opt, frame_rate=30):self.opt = optself.model = Darknet(opt.cfg, nID=14455)# load_darknet_weights(self.model, opt.weights)self.model.load_state_dict(torch.load(opt.weights, map_location='cpu')['model'], strict=False)self.model.cuda().eval()self.tracked_stracks = []  # type: list[STrack]self.lost_stracks = []  # type: list[STrack]self.removed_stracks = []  # type: list[STrack]self.frame_id = 0self.det_thresh = opt.conf_thresself.buffer_size = int(frame_rate / 30.0 * opt.track_buffer)self.max_time_lost = self.buffer_sizeself.kalman_filter = KalmanFilter()

可以看到代码中使用的nID为14455,这里定义了一些存储不同状态轨迹的容器(轨迹类也在这个文件中,class STrack),以及KF卡尔曼滤波器。跟踪过程如下

        with torch.no_grad():pred = self.model(im_blob)# pred is tensor of all the proposals (default number of proposals: 54264). Proposals have information associated with the bounding box and embeddingspred = pred[pred[:, :, 4] > self.opt.conf_thres]     # [p_box, p_conf, p_cls, p_emb] 第四位为conf conf阈值判断# pred now has lesser number of proposals. Proposals rejected on basis of object confidence scoreif len(pred) > 0:dets = non_max_suppression(pred.unsqueeze(0), self.opt.conf_thres, self.opt.nms_thres)[0].cpu()     # 非极大值抑制# Final proposals are obtained in dets. Information of bounding box and embeddings also included# Next step changes the detection scalesscale_coords(self.opt.img_size, dets[:, :4], img0.shape).round()'''Detections is list of (x1, y1, x2, y2, object_conf, class_score, class_pred)'''# class_pred is the embeddings.detections = [STrack(STrack.tlbr_to_tlwh(tlbrs[:4]), tlbrs[4], f.numpy(), 30) for(tlbrs, f) in zip(dets[:, :5], dets[:, 6:])]else:detections = []


    ''' Add newly detected tracklets to tracked_stracks'''unconfirmed = []tracked_stracks = []  # type: list[STrack]for track in self.tracked_stracks:if not track.is_activated:# previous tracks which are not active in the current frame are added in unconfirmed listunconfirmed.append(track)# print("Should not be here, in unconfirmed")else:# Active tracks are added to the local list 'tracked_stracks'tracked_stracks.append(track)


''' Step 2: First association, with embedding'''# Combining currently tracked_stracks and lost_stracksstrack_pool = joint_stracks(tracked_stracks, self.lost_stracks)# Predict the current location with KFSTrack.multi_predict(strack_pool, self.kalman_filter)dists = matching.embedding_distance(strack_pool, detections)# dists = matching.gate_cost_matrix(self.kalman_filter, dists, strack_pool, detections)dists = matching.fuse_motion(self.kalman_filter, dists, strack_pool, detections)# The dists is the list of distances of the detection with the tracks in strack_poolmatches, u_track, u_detection = matching.linear_assignment(dists, thresh=0.7)# The matches is the array for corresponding matches of the detection with the corresponding strack_pool


def embedding_distance(tracks, detections, metric='cosine'):""":param tracks: list[STrack]:param detections: list[BaseTrack]:param metric::return: cost_matrix np.ndarray"""cost_matrix = np.zeros((len(tracks), len(detections)), dtype=np.float)if cost_matrix.size == 0:return cost_matrixdet_features = np.asarray([track.curr_feat for track in detections], dtype=np.float)track_features = np.asarray([track.smooth_feat for track in tracks], dtype=np.float)cost_matrix = np.maximum(0.0, cdist(track_features, det_features)) # Nomalized featuresreturn cost_matrix


def fuse_motion(kf, cost_matrix, tracks, detections, only_position=False, lambda_=0.98):if cost_matrix.size == 0:return cost_matrixgating_dim = 2 if only_position else 4gating_threshold = kalman_filter.chi2inv95[gating_dim]measurements = np.asarray([det.to_xyah() for det in detections])for row, track in enumerate(tracks):gating_distance = kf.gating_distance(track.mean, track.covariance, measurements, only_position, metric='maha')cost_matrix[row, gating_distance > gating_threshold] = np.infcost_matrix[row] = lambda_ * cost_matrix[row] + (1-lambda_)* gating_distancereturn cost_matrix


def linear_assignment(cost_matrix, thresh):if cost_matrix.size == 0:return np.empty((0, 2), dtype=int), tuple(range(cost_matrix.shape[0])), tuple(range(cost_matrix.shape[1]))matches, unmatched_a, unmatched_b = [], [], []cost, x, y = lap.lapjv(cost_matrix, extend_cost=True, cost_limit=thresh)    # 使用JVC匹配for ix, mx in enumerate(x):if mx >= 0:matches.append([ix, mx])unmatched_a = np.where(x < 0)[0]unmatched_b = np.where(y < 0)[0]matches = np.asarray(matches)return matches, unmatched_a, unmatched_b

再对未匹配的track和detection 使用IoU distance 匹配,


网络结构简单,主要是在YOLO3预测头后面加了一个embedding学习。对比tracking by detection,该网络同时输出图像画面中的检测框位置和检测框内物体的embedding,从而加速MOT的速度。但JDE只是同时输出了检测框和embedding信息,后面还要匹配,其实还是两阶段的。看到大佬分析将detection和ReID结合在同一个网络中的做法仍然和分别去做存在一定的差距,因为detection本质上需要catagory特征,然而ReID需要identity特征,同一个网络中不能同时得到较好的class和identity特征。


One Shot Multi-Object Tracking Overview
多目标跟踪算法(JDE)Towards Real-Time Multi-Object Tracking训练方法

