import pymysql
from datetime import datetime
import pandas as pd # pandas库
import numpy as np # numpy库
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf # acf和pacf展示库
from statsmodels.tsa.stattools import adfuller # adf检验库
from statsmodels.stats.diagnostic import acorr_ljungbox # 随机性检验库
from statsmodels.tsa.arima_model import ARMA # ARMA库
import matplotlib.pyplot as plt # matplotlib图形展示库
import prettytable # 导入表格库
# 由于pandas判断周期性时会出现warning,这里忽略提示
import warnings
warnings.filterwarnings('ignore')
def line_mysql():
conn = pymysql.connect(
host='118.31.19.98',
user='cov',
password='cov2019',
db='cov',
charset='utf8'
)
# sql = """create table xl(title varchar(20) , num int(10) ) """
# cur.execute(sql)
cur = conn.cursor()
return cur
cur = line_mysql()
sql = "select * from history"
cur.execute(sql)
data = cur.fetchall()
sum = {}
for i in data[7::]:
j = datetime(i[0].year, i[0].month, i[0].day)
sum[j] = i[2]
cur.close()
dic = pd.DataFrame(pd.Series(sum), columns=['add_num'])
dic = dic.reset_index().rename(columns={'index': 'time'})
dic['time'] =dic['time'].apply(lambda x: x.strftime('%Y-%m-%d'))
ts_data = dic['add_num'].astype('float32')
# 多次用到的表格
def pre_table(table_name, table_rows):
'''
:param table_name: 表格名称,字符串列表
:param table_rows: 表格内容,嵌套列表
:return: 展示表格对象
'''
table = prettytable.PrettyTable() # 创建表格实例
table.field_names = table_name # 定义表格列名
for i in table_rows: # 循环读多条数据
table.add_row(i) # 增加数据
return table
# 稳定性(ADF)检验
def adf_val(ts, ts_title, acf_title, pacf_title):
'''
:param ts: 时间序列数据,Series类型
:param ts_title: 时间序列图的标题名称,字符串
:param acf_title: acf图的标题名称,字符串
:param pacf_title: pacf图的标题名称,字符串
:return: adf值、adf的p值、三种状态的检验值
'''
plt.figure()
plt.plot(ts) # 时间序列图
plt.title(ts_title) # 时间序列标题
# plt.show()
# plot_acf(ts, lags=20, title=acf_title).show() # 自相关检测
# plot_pacf(ts, lags=20, title=pacf_title).show() # 偏相关检测
adf, pvalue, usedlag, nobs, critical_values, icbest = adfuller(ts) # 稳定性(ADF)检验
table_name = ['adf', 'pvalue', 'usedlag', 'nobs', 'critical_values', 'icbest'] # 表格列名列表
table_rows = [[adf, pvalue, usedlag, nobs, critical_values, icbest]] # 表格行数据,嵌套列表
adf_table = pre_table(table_name, table_rows) # 获得平稳性展示表格对象
print ('stochastic score') # 打印标题
print (adf_table) # 打印展示表格
return adf, pvalue, critical_values, # 返回adf值、adf的p值、三种状态的检验值
# 白噪声(随机性)检验
def acorr_val(ts):
'''
:param ts: 时间序列数据,Series类型
:return: 白噪声检验的P值和展示数据表格对象
'''
lbvalue, pvalue = acorr_ljungbox(ts, lags=1) # 白噪声检验结果
table_name = ['lbvalue', 'pvalue'] # 表格列名列表
table_rows = [[lbvalue, pvalue]] # 表格行数据,嵌套列表
acorr_ljungbox_table = pre_table(table_name, table_rows) # 获得白噪声检验展示表格对象
print ('stationarity score') # 打印标题
print (acorr_ljungbox_table) # 打印展示表格
return pvalue # 返回白噪声检验的P值和展示数据表格对象
##### 原始数据检验 ####
# 稳定性检验
adf, pvalue1, critical_values = adf_val(ts_data, 'raw time series', 'raw acf', 'raw pacf')
# 白噪声检验
pvalue2 = acorr_val(ts_data)
ts = np.log(ts_data).replace(float('-inf'),0)
ts.values
# 数据平稳处理
def get_best_log(ts, max_log=5, rule1=True, rule2=True):
'''
:param ts: 时间序列数据,Series类型
:param max_log: 最大log处理的次数,int型
:param rule1: rule1规则布尔值,布尔型
:param rule2: rule2规则布尔值,布尔型
:return: 达到平稳处理的最佳次数值和处理后的时间序列
'''
if rule1 and rule2: # 如果两个规则同时满足
return 0, ts # 直接返回0和原始时间序列数据
else: # 只要有一个规则不满足
for i in range(1, max_log): # 循环做log处理
ts = np.log(ts).replace(float('-inf'),0) # log处理
adf, pvalue1, usedlag, nobs, critical_values, icbest = adfuller(ts, store=False, regresults=False) # 稳定性(ADF)检验
lbvalue, pvalue2 = acorr_ljungbox(ts, lags=1) # 白噪声(随机性)检验
rule_1 = (adf < critical_values['1%'] and adf < critical_values['5%'] and adf < critical_values[
'10%'] and pvalue1 < 0.01) # 稳定性(ADF)检验规则
rule_2 = (pvalue2 < 0.05) # 白噪声(随机性)规则
rule_3 = (i < 5)
if rule_1 and rule_2 and rule_3: # 如果同时满足条件
print ('The best log n is: {0}'.format(i)) # 打印输出最佳次数
return i, ts # 返回最佳次数和处理后的时间序列
# 还原经过平稳处理的数据
def recover_log(ts, log_n):
'''
:param ts: 经过log方法平稳处理的时间序列,Series类型
:param log_n: log方法处理的次数,int型
:return: 还原后的时间序列
'''
for i in range(1, log_n + 1): # 循环多次
ts = np.exp(ts) # log方法还原
return ts # 返回时间序列
#### 对时间序列做稳定性处理 #####
# 稳定性检验规则
rule1 = (adf < critical_values['1%'] and adf < critical_values['5%'] and adf < critical_values[
'10%'] and pvalue1 < 0.01)
# 白噪声检验的规则
rule2 = (pvalue2[0,] < 0.05)
# 使用log进行稳定性处理
log_n, ts_data = get_best_log(ts_data, max_log=5, rule1=rule1, rule2=rule2)
#### 稳定后数据进行检验 ####
# 稳定性检验
adf, pvalue1, critical_values = adf_val(ts_data, 'final time series', 'final acf', 'final pacf')
# 白噪声检验
pvalue2 = acorr_val(ts_data)
# arma最优模型训练
def arma_fit(ts):
'''
:param ts: 时间序列数据,Series类型
:return: 最优状态下的p值、q值、arma模型对象、pdq数据框和展示参数表格对象
'''
max_count = int(len(ts) / 10) # 最大循环次数最大定义为记录数的10%
bic = float('inf') # 初始值为正无穷
tmp_score = [] # 临时p、q、aic、bic和hqic的值的列表
print ('each p/q traning record') # 打印标题
print('p q aic bic hqic')
for tmp_p in range(max_count + 1): # p循环max_count+1次
for tmp_q in range(max_count + 1): # q循环max_count+1次
model = ARMA(ts, order=(tmp_p, tmp_q)) # 创建ARMA模型对象
try:
# ARMA模型训练 disp不打印收敛信息 method条件平方和似然度最大化
results_ARMA = model.fit(disp=-1, method='css')
except:
continue # 遇到报错继续
finally:
tmp_aic = results_ARMA.aic # 模型的获得aic
tmp_bic = results_ARMA.bic # 模型的获得bic
tmp_hqic = results_ARMA.hqic # 模型的获得hqic
print('{:2d}|{:2d}|{:.8f}|{:.8f}|{:.8f}'.format(tmp_p, tmp_q, tmp_aic, tmp_bic, tmp_hqic))
tmp_score.append([tmp_p, tmp_q, tmp_aic, tmp_bic, tmp_hqic]) # 追加每个模型的训练参数和结果
if tmp_bic < bic: # 如果模型bic小于最小值,那么获得最优模型ARMA的�
评论1