#!/usr/bin/env python
import argparse
import logging
import os
import subprocess
import time
import numpy as np
from pandas import read_csv
from matplotlib import rcParams
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
from matplotlib.gridspec import GridSpec
from scipy.ndimage.filters import gaussian_filter1d
import matplotlib.colors as colors
from scipy.optimize import minimize, basinhopping
from six.moves import input as sinput
# ----------------------------------------------------------------------------------
# "THE BEER-WARE LICENSE" (Revision 42):
# <[email protected]> wrote this file. As long as you retain this notice you
# can do whatever you want with this stuff. If we meet some day, and you think
# this stuff is worth it, you can buy me a beer in return. Florian Melsheimer
# ----------------------------------------------------------------------------------
Version = 'PID-Analyzer 0.52'
LOG_MIN_BYTES = 500000
class Trace:
framelen = 1. # length of each single frame over which to compute response
resplen = 0.5 # length of respose window
cutfreq = 25. # cutfreqency of what is considered as input
tuk_alpha = 1.0 # alpha of tukey window, if used
superpos = 16 # sub windowing (superpos windows in framelen)
threshold = 500. # threshold for 'high input rate'
noise_framelen = 0.3 # window width for noise analysis
noise_superpos = 16 # subsampling for noise analysis windows
def __init__(self, data):
self.data = data
self.input = self.equalize(data['time'], self.pid_in(data['p_err'], data['gyro'], data['P']))[1] # /20.
self.data.update({'input': self.pid_in(data['p_err'], data['gyro'], data['P'])})
self.equalize_data()
self.name = self.data['name']
self.time = self.data['time']
self.dt=self.time[0]-self.time[1]
self.input = self.data['input']
#enable this to generate artifical gyro trace with known system response
#self.data['gyro']=self.toy_out(self.input, delay=0.01, mode='normal')####
self.gyro = self.data['gyro']
self.throttle = self.data['throttle']
self.throt_hist, self.throt_scale = np.histogram(self.throttle, np.linspace(0, 100, 101, dtype=np.float64), normed=True)
self.flen = self.stepcalc(self.time, Trace.framelen) # array len corresponding to framelen in s
self.rlen = self.stepcalc(self.time, Trace.resplen) # array len corresponding to resplen in s
self.time_resp = self.time[0:self.rlen]-self.time[0]
self.stacks = self.winstacker({'time':[],'input':[],'gyro':[], 'throttle':[]}, self.flen, Trace.superpos) # [[time, input, output],]
self.window = np.hanning(self.flen) #self.tukeywin(self.flen, self.tuk_alpha)
self.spec_sm, self.avr_t, self.avr_in, self.max_in, self.max_thr = self.stack_response(self.stacks, self.window)
self.low_mask, self.high_mask = self.low_high_mask(self.max_in, self.threshold) #calcs masks for high and low inputs according to threshold
self.toolow_mask = self.low_high_mask(self.max_in, 20)[1] #mask for ignoring noisy low input
self.resp_sm = self.weighted_mode_avr(self.spec_sm, self.toolow_mask, [-1.5,3.5], 1000)
self.resp_quality = -self.to_mask((np.abs(self.spec_sm -self.resp_sm[0]).mean(axis=1)).clip(0.5-1e-9,0.5))+1.
# masking by setting trottle of unwanted traces to neg
self.thr_response = self.hist2d(self.max_thr * (2. * (self.toolow_mask*self.resp_quality) - 1.), self.time_resp,
(self.spec_sm.transpose() * self.toolow_mask).transpose(), [101, self.rlen])
self.resp_low = self.weighted_mode_avr(self.spec_sm, self.low_mask*self.toolow_mask, [-1.5,3.5], 1000)
if self.high_mask.sum()>0:
self.resp_high = self.weighted_mode_avr(self.spec_sm, self.high_mask*self.toolow_mask, [-1.5,3.5], 1000)
self.noise_winlen = self.stepcalc(self.time, Trace.noise_framelen)
self.noise_stack = self.winstacker({'time':[], 'gyro':[], 'throttle':[], 'd_err':[], 'debug':[]},
self.noise_winlen, Trace.noise_superpos)
self.noise_win = np.hanning(self.noise_winlen)
self.noise_gyro = self.stackspectrum(self.noise_stack['time'],self.noise_stack['throttle'],self.noise_stack['gyro'], self.noise_win)
self.noise_d = self.stackspectrum(self.noise_stack['time'], self.noise_stack['throttle'], self.noise_stack['d_err'], self.noise_win)
self.noise_debug = self.stackspectrum(self.noise_stack['time'], self.noise_stack['throttle'], self.noise_stack['debug'], self.noise_win)
if self.noise_debug['hist2d'].sum()>0:
## mask 0 entries
thr_mask = self.noise_gyro['throt_hist_avr'].clip(0,1)
self.filter_trans = np.average(self.noise_gyro['hist2d'], axis=1, weights=thr_mask)/\
np.average(self.noise_debug['hist2d'], axis=1, weights=thr_mask)
else:
self.filter_trans = self.noise_gyro['hist2d'].mean(axis=1)*0.
@staticmethod
def low_high_mask(signal, threshold):
low = np.copy(signal)
low[low <=threshold] = 1.
low[low > threshold] = 0.
high = -low+1.
if high.sum() < 10: # ignore high pinput that is too short
high *= 0.
return low, high
def to_mask(self, clipped):
clipped-=clipped.min()
clipped/=clipped.max()
return clipped
def pid_in(self, pval, gyro, pidp):
pidin = gyro + pval / (0.032029 * pidp) # 0.032029 is P scaling factor from betaflight
return pidin
def rate_curve(self, rcin, inmax=500., outmax=800., rate=160.):
### an estimated rate curve. not used.
expoin = (np.exp((rcin - inmax) / rate) - np.exp((-rcin - inmax) / rate)) * outmax
return expoin
def calc_delay(self, time, trace1, trace2):
### minimizes trace1-trace2 by shifting trace1
tf1 = interp1d(time[2000:-2000], trace1[2000:-2000], fill_value=0., bounds_error=False)
tf2 = interp1d(time[2000:-2000], trace2[2000:-2000], fill_value=0., bounds_error=False)
fun = lambda x: ((tf1(time - x*0.5) - tf2(time+ x*0.5)) ** 2).mean()
shift = minimize(fun, np.array([0.01])).x[0]
steps = np.round(shift / (time[1] - time[0]))
return {'time':shift, 'steps':int(steps)}
def tukeywin(self, len, alpha=0.5):
### makes tukey widow for envelopig
M = len
n = np.arange(M - 1.) #
if alpha <= 0:
return np.ones(M) # rectangular window
elif alpha >= 1:
return np.hanning(M)
# Normal case
x = np.linspace(0, 1, M, dtype=np.float64)
w = np.ones(x.shape)
# first condition 0 <= x < alpha/2
first_condition = x < alpha / 2
w[first_condition] = 0.5 * (1 + np.cos(2 * np.pi / alpha * (x[first_condition] - alpha / 2)))
# second condition already taken care of
# third condition 1 - alpha / 2 <= x <= 1
third_condition = x >= (1 - alpha / 2)
w[third_condition] = 0.5 * (1 + np.cos(2 * np.pi / alpha * (x[third_condition] - 1 + alpha / 2)))
return w
def toy_out(self, inp, delay=0.01, length=0.01, noise=5., mode='normal', sinfreq=100.):
# generates artificial output for benchmarking
freq= 1./(self.time[1]-self.time[0])
toyresp = np.zeros(int((delay+length)*freq))
toyresp[int((delay)*freq):]=1.
toyresp/=toyresp.sum()
toyout = np.convolve(inp, toyresp, mode='full')[:len(inp)]#*0.9
if mode=='normal':
noise_sig = (np.random.random_sample(len(toyout))-0.5)*noise
elif mode=='sin':
noise_sig = (np.sin(2.*n
评论0