#!/usr/bin/python3
from pathlib import Path
from datetime import date, datetime, timedelta
import os
import sys
import pandas as pd
import json
import shutil
class MyDict(dict):
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self.__dict__ = self
class GenKLine:
def __init__(self, start_date, end_date):
self.logList = list()
self.start_date = start_date
self.end_date = end_date
self.conf = None
self.load_conf()
self.of = None
pass
def load_conf(self):
# with open("config_new.json") as f:
with open("config.json") as f:
conf = json.load(f)
self.conf = MyDict(conf)
pass
def filter_data(self, df, prod):
if "trade_time" not in self.conf or prod not in self.conf["trade_time"]:
return df
ret = []
for it in self.conf["trade_time"][prod]:
if it["start"] < it["end"]:
df1 = df[(df["Time"] > it["start"]) & (df["Time"] <= it["end"])]
ret.append(df1)
else:
df1 = df[(df["Time"] > it["start"]) | (df["Time"] <= it["end"])]
ret.append(df1)
df2 = pd.concat(ret, axis=0)
return df2
@staticmethod
def convert_to_double_datetime(dt):
dlt = dt - datetime(1899, 12, 30)
return dlt.total_seconds() / 86400.0
# @staticmethod
def gen_kline_from_tick(self, tick_df, symbol, k, exg):
if k == 'day':
freq = "D"
else:
freq = str(k) + "S"
tick1 = tick_df[['Datetime', 'LastPrice', 'Volume', 'Turnover', 'OpenInterest']]
if exg not in ['CME', 'HKEX', 'SGX']:
tick1.loc[:, "Volume"] = tick1["Volume"].diff()
tick1.loc[:, "Turnover"] = tick1["Turnover"].diff()
tick1.set_index('Datetime', inplace=True)
tick0 = tick1['LastPrice'].resample(freq, label='right').ohlc()
tick0['volume'] = tick1['Volume'].resample(freq, label='right').sum().fillna(0).astype("int64")
tick0['amount'] = tick1['Turnover'].resample(freq, label='right').sum()
tick0['openinterest'] = tick1['OpenInterest'].resample(freq, label='right').last().fillna(0).astype("int64")
tick0['instrument'] = symbol
tick0['Datetime'] = tick0.index
tick0['datetime'] = tick0.apply(
lambda r: GenKLine.convert_to_double_datetime(
r['Datetime']), axis=1)
tick0['Time'] = tick0.apply(lambda r: r.Datetime.time().__str__(), axis=1)
tick0 = self.filter_data(tick0, 'all')
tick0.dropna(inplace=True)
tick0.sort_index(inplace=True) # 排序
# tick2 = tick0.fillna(method='ffill') # 价格先向前对齐
# tick2.fillna(method='bfill', inplace=True) # 后向后对齐
# tick0.loc[:, ['open', 'high', 'low', 'close']] = tick2.loc[:, ['open', 'high', 'low', 'close']]
# tick0.fillna(0, inplace=True) # 成交量和成交额的空值置为0
kline = tick0[["instrument", "datetime", "open", "high", "low", "close", "volume", "amount", "openinterest"]]
return kline
def log_str(self, con):
self.of.writelines(con)
# self.logList.append(con)
def init_log(self, log_file=None):
if log_file is None:
log_file = "log/{}-{}.log".format(self.start_date, self.end_date)
if not Path.exists(Path("log")):
Path.mkdir(Path("log"))
self.of = open(log_file, 'w+', encoding='utf-8')
def write_log(self, log_file=None):
self.of.writelines("\n".join(self.logList))
def get_exch_prod(self, symbol, dt):
if str(symbol).startswith("0") or str(symbol).startswith("3") or \
str(symbol).startswith("123") or str(symbol).startswith("128") or \
str(symbol).startswith("15"):
prod = "SZA"
exg = "SZE"
else:
prod = "SHA"
exg = "SSE"
md_path = "{}/{}/{}/{}".format(
self.conf["data_path"], exg, prod, dt)
md_file = "{}/tick/{}.csv".format(md_path, symbol)
return exg, prod, md_path, md_file
def gen_kline(self, symbol, exg, prod, md_path, md_file, dt):
# exg, prod, md_path, md_file = self.get_exch_prod(symbol, dt)
self.log_str("{} {} {} {} {}".format(datetime.now(), exg, prod, symbol, dt))
if not os.path.exists(md_file):
self.log_str("{} 文件不存在".format(md_file))
else:
md = pd.read_csv(
md_file,
dtype={
'InstrumentID': object},
skipinitialspace=True, skiprows=1, header=None, usecols=range(0, len(self.conf.tick_columns)))
md.set_axis(self.conf.tick_columns, axis='columns', inplace=True)
md["Datetime"] = md.apply(
lambda r: datetime.strptime(
"{} {}.{:03d}".format(
r['TradingDay'],
r['Time'],
r['Milliseconds']),
'%Y%m%d %H:%M:%S.%f'),
axis=1)
md1 = md[md.LastPrice > 0.0001]
for k in self.conf["cycles"]:
if len(md1) <= 0:
continue
md0 = self.gen_kline_from_tick(md1, symbol, k, exg)
# cycle_path = "{}/{}".format(md_path, k)
cycle_path = md_path / k
if not Path.exists(cycle_path):
Path.mkdir(cycle_path)
out_file = "{}/{}/{}.csv".format(md_path, k, symbol)
md0.to_csv(out_file, index=False)
print('{}{} is updated'.format(symbol, k))
# 复制tick文件至新文件夹
out_tick_folder = md_path / 'tick'
if not Path.exists(out_tick_folder):
Path.mkdir(out_tick_folder, parents=True)
out_tick_file = "{}/{}.csv".format(out_tick_folder, symbol)
if not os.path.exists(out_tick_file):
shutil.copy(md_file, out_tick_file)
def start(self):
data_path = Path(self.conf["data_path"])
save_folder = Path(self.conf["save_path"])
start_dt = datetime.strptime(self.start_date, "%Y%m%d")
end_dt = datetime.strptime(self.end_date, "%Y%m%d")
self.init_log()
trade_date = start_dt
while trade_date <= end_dt:
date_str = trade_date.strftime("%Y%m%d")
print("开始处理日期:{}".format(date_str))
for exg in self.conf['exchanges'].keys():
# if exg not in ['CME', 'HKEX', 'SGX']:
# continue
exch_path = data_path / exg
if not exch_path.exists() or not exch_path.is_dir():
print(str(exch_path), "not exists")
continue
for prod_path in exch_path.iterdir():
if not prod_path.is_dir():
print(str(prod_path), "not dir")
continue
if len(self.conf['exchanges'][exg]) > 0 and prod_path.name not in self.conf['exchanges'][exg]:
print(prod_path.name, "is skipped")
continue
day_path = prod_path / date_str
save_path = save_folder / exg / prod_path.name / date_str
if not save_path.exists():
Path.mkdir(save_path,parents=True)
if day_path.exists() and day_path.is_dir():
tick_path = day_path / "tick"
for tick_file in tick_path.iterdir():
if tick_file
评论1