"""
SORT: A Simple, Online and Realtime Tracker
Copyright (C) 2016-2020 Alex Bewley alex@bewley.ai
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
#!/usr/bin/python
# -*- coding:utf8 -*-
from __future__ import print_function
import os
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from skimage import io
import glob
import time
import argparse
from kalmanFilter import KalmanFilter
import cv2
import math
np.random.seed(0)
def linear_assignment(cost_matrix):
try:
import lap
_, x, y = lap.lapjv(cost_matrix, extend_cost=True)
return np.array([[y[i], i] for i in x if i >= 0]) #
except ImportError:
from scipy.optimize import linear_sum_assignment
x, y = linear_sum_assignment(cost_matrix)
return np.array(list(zip(x, y)))
def iou_batch(bb_test, bb_gt):
"""
From SORT: Computes IUO between two bboxes in the form [l,t,w,h]
每一行代表一个跟踪框,每一列代表一个检测框,那么每个坐标的意义就是 跟踪框y与检测框x的IOU
"""
bb_gt = np.expand_dims(bb_gt, 0)
bb_test = np.expand_dims(bb_test, 1)
# 这里用到了 maximum 的广播属性,从 44*1 × 1*56 广播到 44*56
xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
w = np.maximum(0., xx2 - xx1)
h = np.maximum(0., yy2 - yy1)
wh = w * h
o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
+ (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
return (o)
def convert_bbox_to_z(bbox):
"""
Takes a bounding box in the form [x1,y1,x2,y2] and returns z in the form
[x,y,s,r] where x,y is the centre of the box and s is the scale/area and r is
the aspect ratio
"""
w = bbox[2] - bbox[0]
h = bbox[3] - bbox[1]
x = bbox[0] + w / 2.
y = bbox[1] + h / 2.
s = w * h # scale is just area
r = w / float(h)
return np.array([x, y, s, r]).reshape((4, 1))
def convert_x_to_bbox(x, score=None):
"""
Takes a bounding box in the centre form [x,y,s,r] and returns it in the form
[x1,y1,x2,y2] where x1,y1 is the top left and x2,y2 is the bottom right
"""
w = np.sqrt(x[2] * x[3])
h = x[2] / w
if (score == None):
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4))
else:
return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5))
class KalmanBoxTracker(object):
"""
This class represents the internal state of individual tracked objects observed as bbox.
"""
count = 0
def __init__(self, bbox):
"""
Initialises a tracker using initial bounding box.
"""
# define constant velocity model
# 这里dim_x=9 分别为 x, y, s, r, vx, vy, vs, ax, ay (vs指的是面积变化的速率)
# dim_z=4 分别为 x, y, s, r
self.kf = KalmanFilter(dim_x=9, dim_z=4)
self.kf.F = np.array(
[[1, 0, 0, 0, 1, 0, 0, 0.5, 0],
[0, 1, 0, 0, 0, 1, 0, 0, 0.5],
[0, 0, 1, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 1],
[0, 0, 0, 0, 0, 0, 1, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 1, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1]])
self.kf.H = np.array(
[[1, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 1, 0, 0, 0, 0, 0]])
self.kf.R[2:, 2:] *= 10.
self.kf.P[4:, 4:] *= 1000. # give high uncertainty to the unobservable initial velocities
self.kf.P *= 10.
self.kf.Q[-1, -1] *= 0.01
self.kf.Q[4:, 4:] *= 0.01
self.kf.x[:4] = convert_bbox_to_z(bbox)
self.time_since_update = 0
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.history = []
self.hits = 0
self.hit_streak = 0
self.age = 0
self.org_box = bbox.copy()
self.is_throw = False
def update(self, bbox):
"""
Updates the state vector with observed bbox.
"""
self.time_since_update = 0
self.history = []
self.hits += 1
self.hit_streak += 1
self.kf.update(convert_bbox_to_z(bbox))
def predict(self):
"""
Advances the state vector and returns the predicted bounding box estimate.
"""
if ((self.kf.x[6] + self.kf.x[2]) <= 0):
self.kf.x[6] *= 0.0
self.kf.predict()
self.age += 1
if (self.time_since_update > 0):
self.hit_streak = 0
self.time_since_update += 1
self.history.append(convert_x_to_bbox(self.kf.x))
return self.history[-1]
def get_state(self):
"""
Returns the current bounding box estimate.
"""
bbox = convert_x_to_bbox(self.kf.x)[0]
x = (bbox[0]+bbox[2])/2 - (self.org_box[0]+self.org_box[2])/2
y = (bbox[1] + bbox[3]) / 2 - (self.org_box[1] + self.org_box[3]) / 2
# 基于x,y轴的位移判断
if math.fabs(x) > 0.5 * (self.org_box[2]-self.org_box[0]+bbox[2]-bbox[0]) and \
y > 2*(self.org_box[3]-self.org_box[1]+bbox[3]-bbox[1]):
self.is_throw = True
# 基于欧式距离的判断
# disdance = math.hypot(x, y)
# if disdance > 2 * (self.org_box[2] - self.org_box[0] + bbox[2] - bbox[0]) and \
# disdance > (self.org_box[3] - self.org_box[1] + bbox[3] - bbox[1]):
# self.is_throw = True
return bbox, self.is_throw
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
"""
Assigns detections to tracked object (both represented as bounding boxes)
Returns 3 lists of matches, unmatched_detections and unmatched_trackers
"""
if (len(trackers) == 0):
return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)
iou_matrix = iou_batch(detections, trackers)
if min(iou_matrix.shape) > 0:
# 将大于阈值的置为1,小于阈值的置为0
a = (iou_matrix > iou_threshold).astype(np.int32)
# 如果每个跟踪框只与一个检测框IOU大于阈值,每个检测框只与一个跟踪框IOU大于阈值,则认为跟踪唯一,大于阈值的跟踪框都是正确的
if a.sum(1).max() == 1 and a.sum(0).max() == 1:
matched_indices = np.stack(np.where(a), axis=1)
# 否则需要使用线性任务指派算法,将每个检测框与跟踪框以最小代价匹配起来,这也是为什么 iou_matrix 需要乘以 -1 的原因
else:
matched_indices = linear_assignment(-iou_matrix)
else:
matched_indices = np.empty(shape=(0, 2))
unmatched_detections = []
for d, det in enumerate(detections):
if (d not in matched_indices[:, 0]):
unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if (t not