import datetime
import warnings
import numpy as np
import pandas as pd
import requests
from ray import serve
# def vector_backtest(transfer_position: pd.DataFrame, asset_price=None, asset_yield=None, return_holding_weight=False):
def vector_backtest(transfer_position: dict, asset_price: dict):
"""
基于矩阵运算的回测算法
Parameters
----------
transfer_position : list or DataFrame
调仓记录,其中日期格式和资产代码格式与传入的asset_yield或asset_price保持一致即可,权重需乘100%;
若有现金部分,需设为'cash',传入现金比例。
若为list,格式为(key名需保持一致)::
[{'date': '2020-12-31',
'value': [{'tradingcode': '000001.SZ', 'weight': 30},
{'tradingcode': '000002.SZ', 'weight': 60},
{'tradingcode': 'cash', 'weight': 10}]
},
{'date': '2021-01-31',
'value': [{'tradingcode': '000001.SZ', 'weight': 20},
{'tradingcode': '000002.SZ', 'weight': 50},
{'tradingcode': '000004,SZ', 'weight': 30}]
}]
若为DataFrame,格式为::
'000001.SZ' '000002.SZ' '000004.SZ' 'cash'
'2020-12-31' 30 60 0.0(或NAN) 10
'2021-01-31' 20 50 30 0.0
asset_price : DataFrame
资产的价格序列,index为日期,columns为资产名称或代码等,即每一列为一个资产的价格序列
asset_yield : DataFrame
资产的收益率序列,可选择传入收益率序列或价格序列,index为日期,columns为资产名称或代码等,即每一列为一个资产的收益率序列(乘以100%)
return_holding_weight : bool
是否返回每日的持仓
Returns
-------
portfolio_nav : pd.Series
组合的净值序列,index为日期
holding_weight : DataFrame
组合每日的持仓权重, index为日期,columns为资产名称或代码。return_holding_weight=False时不返回
"""
transfer_position = pd.DataFrame(transfer_position)
asset_price = pd.DataFrame(asset_price)
asset_yield = None
return_holding_weight = False
if isinstance(transfer_position, list) and transfer_position:
# 将list转换为dataframe格式
transfer_position_df = pd.DataFrame(transfer_position).groupby('date').apply(
lambda x: pd.DataFrame(list(x['value'])[0]))
transfer_position_df = (transfer_position_df.reset_index())[['date', 'tradingcode', 'weight']].copy()
transfer_position = transfer_position_df.pivot(index='date', columns='tradingcode', values='weight')
transfer_position.sort_index(inplace=True)
# transfer_position.fillna(0.0, inplace=True)
# 权重归一化
transfer_position = (transfer_position.div(transfer_position.sum(axis=1), axis=0) * 100).copy()
transfer_position.fillna(0.0, inplace=True)
# 删除有现金的一列,后面会重新计算现金比例
if 'cash' in transfer_position.columns.tolist():
transfer_position.drop(labels='cash', axis=1, inplace=True)
if asset_price is None and asset_yield is None:
raise ValueError("asset_price and asset_yield cannot both be None")
# 收益率序列转换为价格序列
if asset_price is None:
asset_yield.sort_index(inplace=True)
asset_yield.fillna(0.0, inplace=True)
asset_price = (asset_yield / 100 + 1).cumprod(axis=0)
asset_price.sort_index(inplace=True)
# 对价格序列做处理,<= 0的置为NaN
asset_price = asset_price.where(asset_price > 0, np.nan)
asset_price.fillna(method="ffill", inplace=True)
# 调仓日期
transfer_index = np.sort(transfer_position.index)
# 价格序列日期
price_index = np.sort(asset_price.index)
# 判断价格日期是否包含所有调仓日期
if any(transfer_index > price_index[-1]) or any(transfer_index < price_index[0]):
warnings.warn("the earliest and latest date of asset_price or asset_yield is {} and {}, "
"not contain all transfer date {}".format(price_index[0], price_index[-1], transfer_index))
transfer_position = transfer_position.loc[
transfer_index[(transfer_index <= price_index[-1]) & (transfer_index >= price_index[0])]].copy()
transfer_index = np.sort(transfer_position.index)
# 若调仓日期不是交易日,则往后取最近的一个交易日
transfer_index = [np.min(price_index[price_index >= transfer_index[i]]) for i in range(len(transfer_index))]
transfer_position.index = transfer_index
# 若有重复的调仓日,取最新调仓记录进行调仓
transfer_position = transfer_position[~transfer_position.index.duplicated(keep='last')].copy()
transfer_index = np.sort(transfer_position.index)
# 若价格序列的最后日期等于最后的调仓日期,则计算组合净值时无需考虑这次调仓,但返回每日调仓时,需要将该日的持仓权重改为调仓后的权重
is_last_transfer = False
if transfer_index[-1] == price_index[-1]:
is_last_transfer = True
transfer_index = transfer_index[:-1].copy()
# 获取调仓日的后一个交易日,即权重生效日
price_index = list(price_index)
reset_index = [transfer_index[0]] + [price_index[price_index.index(transfer_index[i]) + 1] for i in
range(1, len(transfer_index))]
# 判断价格序列的资产是否包含所有涉及调仓的资产
transfer_assets = transfer_position.columns.tolist()
price_assets = asset_price.columns.tolist()
if not set(transfer_assets) <= set(price_assets):
add_assets = list(set(transfer_assets) - set(price_assets))
warnings.warn("asset_price or asset_yield does not contain data for asset {}".format(add_assets))
for add_asset in add_assets:
asset_price[add_asset] = np.nan
# 使资产价格序列和调仓权重的列名顺序一致
asset_price = asset_price[transfer_assets].copy()
transfer_position = transfer_position[transfer_assets].copy()
# 判断调仓日所有资产是否都有行情值,若没有,相当于未下单成功,则持仓设为0,加入现金部分
transfer_price = (asset_price.loc[transfer_index]).copy()
transfer_position[transfer_price.isnull()] = 0.0
transfer_position['cash'] = 100.0 - transfer_position.sum(axis=1)
asset_price['cash'] = 1.0
# 保证资产价格序列和调仓权重的列名顺序一致
transfer_assets = transfer_position.columns.tolist()
asset_price = asset_price[transfer_assets].copy()
transfer_position = transfer_position[transfer_assets].copy()
# 基准矩阵,计算每两个调仓期区间的累计收益
bench_price = pd.DataFrame(index=asset_price.index, columns=asset_price.columns)
_bench_price = (asset_price.loc[transfer_index]).copy()
_bench_price.index = reset_index
bench_price.loc[reset_index] = _bench_price.loc[reset_index]
bench_price.fillna(method='ffill', inplace=True)
# 权重矩阵
weight_ts = pd.DataFrame(index=asset_price.index, columns=asset_price.columns)
_weight_ts = (transfer_position.loc[transfer_index]).copy()
_weight_ts.index = reset_index
weight_ts.loc[reset_index] = _weight_ts.loc[reset_index]
weight_ts.fillna(method='ffill', inplace=True)
# 计算每两个调仓期区间的组合收益
interval_cum_return = asset_price / bench_price
interval_cum_return = pd.DataFrame((interval_cum_return * weight_ts / 100).sum(axis=1), columns=['returns'])
transfer_cum_return
ray serve test 2
需积分: 0 64 浏览量
更新于2023-04-25
收藏 625KB ZIP 举报
Ray Serve 是一个强大的分布式服务框架,它作为 Ray 的一部分,专为 Python 应用程序设计,用于构建和部署机器学习模型、微服务以及任何其他可调用对象。这个"ray serve test 2"可能是一个测试项目,展示了如何利用 Ray Serve 来搭建、管理和扩展你的服务。
在 Ray Serve 中,你可以定义服务(services)和端点(endpoints),服务是实际执行任务的可调用对象,而端点则是服务的入口,允许外部客户端通过 HTTP 请求进行交互。这种设计使得开发者可以轻松地将模型部署为高可用的服务,并且能够灵活地进行水平扩展。
让我们详细了解一下 Ray Serve 的核心概念:
1. **定义服务**:在 Ray Serve 中,你可以定义一个 `ServeDeployment` 对象,它包含了你的可调用对象(如一个机器学习模型)和相关的配置信息,如副本数量、资源需求等。例如:
```python
from ray import serve
from my_model import MyModel
@serve.deployment(name="my_model", num_replicas=2)
class MyModelDeployment:
def __init__(self):
self.model = MyModel()
@serve.http_endpoint("/predict", methods=["POST"])
def predict(self, request):
# 处理请求并返回预测结果
...
```
2. **启动 Serve**:启动 Ray Serve 集群非常简单,只需要一行代码:
```python
serve.start()
```
3. **部署服务**:一旦 Serve 集群启动,你可以部署上面定义的服务:
```python
MyModelDeployment.deploy()
```
4. **创建端点**:服务可以有多个端点,每个端点对应不同的 API 路径和处理函数。在上面的例子中,`/predict` 就是一个端点,接受 POST 请求。
5. **客户端交互**:客户端可以通过 HTTP 请求与服务进行交互。例如,使用 Python 的 `requests` 库:
```python
import requests
response = requests.post("http://localhost:8000/my_model/predict", json=input_data)
prediction = response.json()
```
6. **动态扩展**:Ray Serve 允许你根据负载情况动态调整服务的副本数量,以实现弹性伸缩:
```python
MyModelDeployment.scale(4) # 增加到 4 个副本
MyModelDeployment.scale(-2) # 减少 2 个副本
```
7. **监控和调试**:Ray Serve 提供了内置的指标收集和 Prometheus 支持,可以帮助你监控服务的性能和健康状况。
在 "rayserve_test" 文件中,可能包含了创建、部署和测试 Ray Serve 服务的脚本。这些脚本可能涉及模型加载、服务定义、端点设置、客户端请求以及性能测试等环节。通过分析和运行这些测试代码,你可以深入理解 Ray Serve 如何在实际场景中工作,如何优化服务性能,以及如何处理并发请求等。
Ray Serve 提供了一个高效、灵活的平台,让数据科学家和工程师能够快速将他们的模型部署为生产级服务,同时享受到 Ray 的分布式计算能力。这个"ray serve test 2"项目为你提供了一个实践和学习 Ray Serve 的机会,通过它,你可以掌握服务部署、扩展和管理的技巧,提升你的分布式系统开发能力。
GammaGao
- 粉丝: 47
- 资源: 40
最新资源
- 层次特征融合框架在适应性视觉跟踪中的粒子滤波器应用
- comsol 平板动网格电弧仿真 耦合了流体传热 电磁场 层流等多个物理场 可以修改电极材料、距离、电路、电极移动速度
- 基于CNN卷积网络搭建人脸识别模型源码-完成学生人脸签到系统.zip
- 开发一个图书管理系统,是一个简化版本的数据库应用程序
- emqx-5.0.22-windows-amd64
- 心脏MR图像中心室分割基于活动轮廓模型与非线性形状先验的应用
- 永磁同步电机的双环以及三环控制仿真模型以及参考资料
- 基于多尺度纹理模型的乳腺超声图像级集分割方法及其临床评估
- Python机器人-这是机器人算法的 Python 代码集合
- VSG模型同步机构网型逆变器Matlab 具备VSG功能的逆变器仿真模型,同步发电机,构网型逆变器,基于MATLAB Simulink建模仿真 具备一次调频,惯性阻尼,一次调压 可以运行于离网模式
- 基于单层圆形麦克风阵列采集音频实现MUSIC算法的声源定位python实现源码+说明
- 基于区域活性轮廓模型的图像对象与背景提取方法
- 西门子Siemens PLc程序,TiA博途V15.1 V16 V17版冷热水恒压供水系统,变频器控制,模拟量输入和输出处理,温度控制,流量计算控制,配方控制,LAd和ScL语言
- 最新注册功能,包含sql脚本,readme.txt,源码文件
- 图像拟合的活跃几何形状模型及其在脑脊液结构检测与分割中的应用
- Proteus 8 Professional软件使用.zip