# -*- coding: utf-8 -*-
from multiprocessing import Process, Pool
from functools import partial
import logging
import time, datetime
import os
import pandas as pd
import json
import pickle
#import logger
from flask import (
Blueprint,
Response,
jsonify,
request,
)
import backtest
from bt_utils import (evaluate_expression,
DEFAULT_START_DATE,
DEFAULT_END_DATE,
DEFAULT_INSTRUMENT,
DEFAULT_BENCHMARK,
DEFAULT_HISTORY,
DEFAULT_MARKET_STATUS)
# create the blueprint :)
def get_logger(logger_name, level=logging.INFO):
'''
:param logger_name:
:return: a logger
'''
logger = logging.getLogger(logger_name)
logger.setLevel(level)
if not logger.handlers:
formatter = logging.Formatter('%(asctime)s %(funcName)s %(lineno)s %(levelname)s %(message)s')
streamhandler = logging.StreamHandler()
streamhandler.setFormatter(formatter)
logger.addHandler(streamhandler)
logger.debug(logger_name)
return logger
new_logger=get_logger('new_logger')
bp = Blueprint('view', __name__, url_prefix='/')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def build_json_resp(result=None, status=200, reason=None, content=None, rest_framework="flask"):
if status != 200:
if result is not None:
# old mode
rst = {'result': result, 'reason': reason, 'content': content}
elif reason is not None:
rst = {'reason': reason}
else:
rst = {}
else:
if result is not None:
# old mode
rst = {'result': result, 'reason': reason, 'content': content}
elif content is not None:
rst = content
else:
rst = {}
if rest_framework == "flask":
return Response(
response=json.dumps(rst),
status=status,
mimetype="application/json"
)
else:
return json.dumps(rst)
@bp.route('/<input_type>', methods=['POST'])
def calculate_expression_fitness(input_type):
"""
fitness calculation restful entrance
"""
logger.debug("Get POST Request to get fitness")
# extract input argument
kwargs = json.loads(request.data.decode('utf-8'))
print("kwargs={}/type {}/input_type {}".format(kwargs,
request.data.decode('utf-8'),
input_type))
# Strange behavior ?????
if type(kwargs) == str:
kwargs = eval(kwargs)
print("kwargs = {}/type {}".format(kwargs, type(kwargs)))
if input_type == "expression":
strategy_name = kwargs.get("strategy_name", "None")
mkt_status = kwargs.get("mkt_status")
buy_size = kwargs.get("buy_size", 20)
expression = kwargs.get("expression")
instrument_date_info = kwargs.get("instrument_date_info")
instrument_date_info = json.loads(instrument_date_info)
instrument_date_info['date'] = pd.to_datetime(instrument_date_info.date, unit='ms')
prediction = evaluate_expression(expression)
elif input_type == "prediction":
prediction_file = kwargs.get("prediction")
try:
with open(prediction_file, "rb") as f:
prediction = pickle.load(f)
except Exception as e:
print("task failed: load prediction file : {} failed! Error msg: {}".format(
prediction_file, e))
return build_json_resp(
reason="task failed: load prediction file : {} failed! Error msg: {}".format(
prediction_file, e
),
status=500,
)
strategy_name = prediction.get("strategy_name", "None")
mkt_status = prediction.get("mkt_status")
buy_size = prediction.get("buy_size", 20)
prediction = prediction.get("prediction")
else:
print("task failed: input_type = {} not in [expression|prediction]".format(input_type))
return build_json_resp(
reason="task failed: input_type = {} not in [expression|prediction]".format(input_type),
status=500,
)
# calculate fitness
beg = time.time()
new_logger.info('view starting backtest : ---------------- ')
new_logger.info("strategy_name:"+str(strategy_name))
new_logger.info("y_prediction:"+str(prediction.head(10)))
new_logger.info("ms_code:"+str(mkt_status))
new_logger.info("start_date:"+str(DEFAULT_START_DATE))
new_logger.info("buy_size:"+str(buy_size))
new_logger.info("ms_seq:"+str(DEFAULT_MARKET_STATUS))
new_logger.info("history_df:"+str(DEFAULT_HISTORY.head(10)))
try:
stat, result = backtest.do(strategy_name = strategy_name + '-' + mkt_status,
y_prediction = prediction,
instruments = DEFAULT_INSTRUMENT,
ms_seq = DEFAULT_MARKET_STATUS,
ms_code = mkt_status,
start_date = DEFAULT_START_DATE,
end_date = DEFAULT_END_DATE,
buy_size = buy_size,
benchmark = DEFAULT_BENCHMARK,
history_df = DEFAULT_HISTORY,
push_result = False)
#with open("{}".format(strategy_name), "wb") as f:
# pickle.dump([stat, result], f, protocol=4)
result.to_pickle(strategy_name)
result = stat
except Exception as e:
print("fail to calculate fitness. Error: {}".format(e))
logger.exception("fail to calculate fitness")
result = e
dur_work = time.time() - beg
# stage result back
if not isinstance(result, Exception):
return build_json_resp(status=200, content={
"result": result,
"strategy_name": strategy_name,
"dur": dur_work
})
else:
print("task failed in {:.6f} (work) with exception {}".format(
dur_work,
str(result),
))
return build_json_resp(
reason="task failed in {:.6f} (work) with exception {}".format(
dur_work,
str(result),
),
status=500,
)
@bp.route('/calfit', methods=['POST'])
def calculate_fitness():
'''
计算fitness并返回
:return:
'''
new_logger.info("Get POST Request to get fitness")
# 入参解析
try:
kwargs = json.loads(request.data.decode('utf-8'))
new_logger.info("kwargs={}".format(kwargs))
# 如果是str,转成dic
if type(kwargs) == str:
kwargs = eval(kwargs)
except Exception as e:
return build_json_resp(
reason="请求入参解析失败! Error msg: {}".format(e),
status=500,
)
# 获取文件数据路径
DATA_DIR = os.environ.get("DATA_DIR", "/data/")
DEFAULT_DATA_FILE = os.environ.get("DATA_FILE", "{}/data.pkl".format(DATA_DIR))
# 读取 data.pkl 数据
with open(DEFAULT_DATA_FILE, 'rb') as f:
data = pickle.load(f)
# 获取对应数据
buy_size = kwargs.get("buy_size")
columns = kwargs.get("columns")
models = kwargs.get("models")
# 将data.pkl按照因子列表取出,并将因子名转换为序号
feature = data[columns].values.T
# 设置进程池为100
pool = Pool(100)
# 计算y值
try:
func_ypred = partial(do_ypred, feature)
y_list = pool.map(func=func_ypred, iterable=models.items())
new_logger.info("y值计算完毕!")
except Exception as e:
return build_json_resp(
reason="y值计算错误! Error msg: {}".format(e),
status=500,
)
# 计算fitness
try:
func_fitness = partial(do_fitness, buy_size)
f