import numpy
import scipy # use numpy if scipy unavailable
import scipy.linalg # use numpy if scipy unavailable
## Copyright (c) 2004-2007, Andrew D. Straw. All rights reserved.
## Copyright (c) 2013, Nico Weber
## Jun 2013: Changed to true classical ransac and optional msac.
## Redistribution and use in source and binary forms, with or without
## modification, are permitted provided that the following conditions are
## met:
## * Redistributions of source code must retain the above copyright
## notice, this list of conditions and the following disclaimer.
## * Redistributions in binary form must reproduce the above
## copyright notice, this list of conditions and the following
## disclaimer in the documentation and/or other materials provided
## with the distribution.
## * Neither the name of the Andrew D. Straw nor the names of its
## contributors may be used to endorse or promote products derived
## from this software without specific prior written permission.
## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
## "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
## LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
## A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
## OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
## SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
## LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
## DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
## THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
## (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
## OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
def ransac(data,model,n,k,t,d,debug=False,return_all=False,msac=True):
"""fit model parameters to data using the RANSAC algorithm
- data - 样本点
- model - 假设模型
- n - 生成模型所需的最小样本点
- k - 最大迭代次数
- t - 阈值:作为判断点满足模型的条件
- d - 拟合较好时,需要的样本点最少的个数,当阈值看待
"""
iterations = 0
bestfit = None
bestcost = numpy.infty
best_inlier_idxs = None
while iterations < k:
#随机选取n个maybe_idxs点,可能是内点,也可能全部是局外点
#所有点作为测试点?
maybe_idxs, test_idxs = random_partition(n, data.shape[0])
maybeinliers = data[maybe_idxs, :]
test_points = data[test_idxs]
#拟合模型
maybemodel = model.fit(maybeinliers)
test_err = model.get_error(test_points, maybemodel)#计算误差,平方和最小
also_idxs = test_idxs[test_err < t] # 满足误差要求的样本点
if debug:
print ('test_err.min()', test_err.min())
print ('test_err.max()', test_err.max())
print ('numpy.mean(test_err)', numpy.mean(test_err))
print ('number of inliers', len(also_idxs))
if len(also_idxs) > d:#若样本点数目大于d
thiscost = (data.shape[0] - n - len(also_idxs)) * t
if msac:
thiscost += numpy.sum(test_err[test_err < t])
if thiscost < bestcost:
if debug:
print ('new best fit',thiscost,bestcost)
bestcost = thiscost
best_inlier_idxs = numpy.concatenate((maybe_idxs, also_idxs))#拼接样本,更新样本点
iterations+=1
if best_inlier_idxs is None:
raise ValueError("did not meet fit acceptance criteria")
bestfit = model.fit(data[best_inlier_idxs,:])
if return_all:
return bestfit, {'inliers':best_inlier_idxs}
else:
return bestfit
def random_partition(n,n_data):
"""return n random rows of data (and also the other len(data)-n rows)"""
all_idxs = numpy.arange( n_data )
numpy.random.shuffle(all_idxs)
idxs1 = all_idxs[:n]
idxs2 = all_idxs[n:]
return idxs1, idxs2
class LinearLeastSquaresModel:
"""linear system solved using linear least squares
最小二乘法求线性解
This class serves as an example that fulfills the model interface
needed by the ransac() function.
"""
def __init__(self,input_columns,output_columns,debug=False):
self.input_columns = input_columns
self.output_columns = output_columns
self.debug = debug
def fit(self, data):
A = numpy.vstack([data[:,i] for i in self.input_columns]).T
B = numpy.vstack([data[:,i] for i in self.output_columns]).T
x,resids,rank,s = scipy.linalg.lstsq(A,B)
return x
def get_error(self, data, model):
A = numpy.vstack([data[:,i] for i in self.input_columns]).T
B = numpy.vstack([data[:,i] for i in self.output_columns]).T
B_fit = scipy.dot(A,model)
err_per_point = numpy.sum((B-B_fit)**2,axis=1) # sum squared error per row
return err_per_point
def test():
# 生成理想数据
n_samples = 500#样本个数
n_inputs = 1#输入变量个数
n_outputs = 1#输出变量个数
#
A_exact = 20*numpy.random.random((n_samples,n_inputs) )#随机生成0-20之间的500个数据
#高斯分布概率密度随机数
perfect_fit = 60*numpy.random.normal(size=(n_inputs,n_outputs) ) # 随机线性度
B_exact = scipy.dot(A_exact,perfect_fit)#y=x*k
assert B_exact.shape == (n_samples,n_outputs)
# add a little gaussian noise (linear least squares alone should handle this well)
A_noisy = A_exact + numpy.random.normal(size=A_exact.shape )
B_noisy = B_exact + numpy.random.normal(size=B_exact.shape )
if 1:
# add some outliers
n_outliers = 100
all_idxs = numpy.arange( A_noisy.shape[0] )#获取索引0-499
numpy.random.shuffle(all_idxs)#打乱
outlier_idxs = all_idxs[:n_outliers]#取打乱后的前100个作为外点
non_outlier_idxs = all_idxs[n_outliers:]#剩下的就不是外点喽
A_noisy[outlier_idxs] = 20*numpy.random.random((n_outliers,n_inputs) )#把局外点的横坐标进行赋值
B_noisy[outlier_idxs] = 50*numpy.random.normal(size=(n_outliers,n_outputs) )#把局外点的纵坐标进行赋值
# setup model
all_data = numpy.hstack( (A_noisy,B_noisy) )
input_columns = range(n_inputs) # the first columns of the array
output_columns = [n_inputs+i for i in range(n_outputs)] # the last columns of the array
debug = False
model = LinearLeastSquaresModel(input_columns,output_columns,debug=debug)
linear_fit,resids,rank,s = scipy.linalg.lstsq(all_data[:,input_columns],
all_data[:,output_columns])
# run RANSAC algorithm
ransac_fit, ransac_data = ransac(all_data,model,
2, 1000, 7e3, 300, # misc. parameters
debug=debug,return_all=True)
if 1:
import pylab
sort_idxs = numpy.argsort(A_exact[:,0])
A_col0_sorted = A_exact[sort_idxs] # maintain as rank-2 array
if 1:
pylab.plot( A_noisy[:,0], B_noisy[:,0], 'k.', label='data' )
pylab.plot( A_noisy[ransac_data['inliers'],0], B_noisy[ransac_data['inliers'],0], 'bx', label='RANSAC data' )
else:
pylab.plot( A_noisy[non_outlier_idxs,0], B_noisy[non_outlier_idxs,0], 'k.', label='noisy data' )
pylab.plot( A_noisy[outlier_idxs,0], B_noisy[outlier_idxs,0], 'r.', label='outlier data' )
pylab.plot( A_col0_sorted[:,0],
numpy.dot(A_col0_sorted,ransac_fit)[:,0],
label='RANSAC fit' )
pylab.plot(