from x_com import xport_core
from x_com import source_api
from datetime import datetime
from x_com.source_api import QueryDayKCondition,QueryMinkKCondition, DayResponse, MinResponse, DayKData, MinKData
import numpy as np
import pandas as pd
from typing import Dict
logger = xport_core.getLogger()
class OptionService:
async def on_init(self):
self.daybar_h5_file_path = r'D:\dxmesh\option\daybar\etf_option_daybar.h5'
logger.info("on_init...")
async def on_finalize(self):
logger.info("on_finalize...")
# QueryDayK
async def query_day_k(self, ctx: xport_core.Context, param: source_api.QueryDayKCondition) -> source_api.DayResponse:
def read_h5_data(h5_file,order_book_id,start_date, end_date,high_price_limit):
# 读取 HDF5 文件
with pd.HDFStore(h5_file, 'r') as store:
data = store.select(order_book_id)
# 构建查询条件,转换数据类型
df = pd.DataFrame(data)
df['date'] = pd.to_datetime(df['date'], format="%Y%m%d")
start_datetime = datetime.strptime(start_date, "%Y%m%d")
end_datetime = datetime.strptime(end_date, "%Y%m%d")
# 根据开始时间和结束时间筛选数据
if high_price_limit>=0.0:
data = df[(df['date'] >= start_datetime) & (df['date'] <= end_datetime)]
data = df[df['high']>= high_price_limit]
else:
data = df[(df['date'] >= start_datetime) & (df['date'] <= end_datetime)]
daykdata_list=[]
for _, row in data.iterrows():
daykdata = source_api.DayKData()
daykdata.gretting=[]
daykdata.date = row['date'].strftime("%Y%m%d")
daykdata.open_price = row['open']
daykdata.high_price = row['high']
daykdata.low_price = row['low']
daykdata.close_price = row['close']
daykdata.volume = row['volume']
daykdata.total_turnover = row['total_turnover']
daykdata_list.append(daykdata)
return daykdata_list
# 用法示例
#硬盘
order_book_id=param.order_book_id
start_time_to_query = param.start_date
end_time_to_query = param.end_date
high_price_limit = param.high_price_limit
daykdata_list=read_h5_data(self.daybar_h5_file_path,order_book_id,start_time_to_query,end_time_to_query,high_price_limit)
response = source_api.DayResponse()
response.day_k_data = daykdata_list
return response
# QueryMinK
async def query_min_k(self, ctx: xport_core.Context, param: source_api.QueryMinkKCondition) -> source_api.MinResponse:
def read_h5_data(h5_file, start_date, end_date):
# 读取 HDF5 文件
with pd.HDFStore(h5_file, 'r') as store:
data = store.select('data')
# 将字符串类型的 datetime 列转换为 datetime 类型,单位为秒
data['trading_date'] = pd.to_datetime(data['trading_date'], format="%Y%m%d")
data['datetime'] = pd.to_datetime(data['datetime'], format='%Y%m%d%H%M%S', errors='coerce')
# 去除 NaT (Not a Time) 的行
data = data.dropna(subset=['datetime'])
# 将开始和结束日期转换为 datetime64[ns] 类型,单位为秒
start_date = pd.to_datetime(start_date, format='%Y%m%d%H%M%S', errors='coerce')
end_date = pd.to_datetime(end_date, format='%Y%m%d%H%M%S', errors='coerce')
# 构建查询条件
query = (data['datetime'] >= start_date) & (data['datetime'] <= end_date)
# 在原始数据上执行查询
data = data[query]
minkdata_list = []
for _, row in data.iterrows():
min_k_data = source_api.MinKData()
min_k_data.datetime = row['datetime'].strftime("%Y%m%d%H%M%S")
min_k_data.trading_date=row['trading_date'].strftime("%Y%m%d")
min_k_data.open_price = row['open']
min_k_data.high_price = row['high']
min_k_data.low_price = row['low']
min_k_data.close_price = row['close']
min_k_data.volume = row['volume']
min_k_data.total_turnover = row['total_turnover']
minkdata_list.append(min_k_data)
return minkdata_list
# 用法示例
#硬盘
minbar_h5_file_path = r'D:\dxmesh\option\minbar' + '\\' + param.order_book_id + '.h5'
start_time_to_query = param.start_datetime
end_time_to_query = param.end_datetime
minkdata_list=read_h5_data(minbar_h5_file_path,start_time_to_query,end_time_to_query)
response = source_api.MinResponse()
response.min_k_data = minkdata_list
return response