from __future__ import annotations
from typing import Any, List
from flowi.components.component_base import ComponentBase
from flowi.experiment_tracking.experiment_tracking import ExperimentTracking
from flowi.prediction.prediction_flow import create_transform_pipeline
from flowi.utilities.logger import Logger
from flowi.utilities.imports import import_class
from flowi.utilities.mongo import Mongo
from flowi.utilities.strings import convert_camel_to_snake
class Node(object):
def __init__(self, id_: str, node: dict, previous_node: str or None, next_node: str or None):
self.id: str = id_
node_type = node["type"]
self.method_name: str = convert_camel_to_snake(node["properties"]["name"])
self.type: str = node_type
self.node_class = node["properties"]["class"]
self.previous_nodes: List[Node] = [previous_node] if previous_node is not None else []
self.next_nodes: List[Node] = [next_node] if next_node is not None else []
self.changed_variables: list = []
self.state: dict = dict()
self.prediction_flow: List[dict] = []
self.finished: bool = False
self._logger = Logger(__name__)
self._experiment_tracking: ExperimentTracking = ExperimentTracking()
self._mongo = Mongo()
self.attributes: dict = node["properties"]["attributes"]
def add_previous_node(self, previous_node: Node or None):
if previous_node is not None:
self.previous_nodes.append(previous_node)
def add_next_node(self, next_node: Node or None):
if next_node is not None:
self.next_nodes.append(next_node)
def _get_class_name(self):
node_type = self.type.lower()
return f"flowi.components.{node_type}.{self.node_class}"
def _import_component(self):
class_name = self._get_class_name()
return import_class(class_name)()
def _prepare_state(self):
state = {
"df": self._append_incoming_df(attribute_type="df"),
"test_df": self._append_incoming_df(attribute_type="test_df"),
"model": self._get_incoming_attribute("model"),
"model_uri": self._get_incoming_attribute("model_uri"),
"parameters": self._get_incoming_attribute("parameters"),
"y_pred": self._get_incoming_attribute("y_pred"),
"y_true": self._get_incoming_attribute("y_true"),
"has_model_selection_in_next_step": self._model_selection_in_next(),
"experiment_id": self._get_incoming_attribute("experiment_id"),
"mongo_id": self._get_incoming_attribute("mongo_id"),
}
return state
def set_current_experiment(self):
if self.type == "Models" or self.type == "ModelSelection":
experiment_id = self._experiment_tracking.get_experiment()
self.state["experiment_id"] = experiment_id
else:
experiment_id = self.state["experiment_id"]
self._experiment_tracking.set_current_experiment(experiment_id)
def _model_selection_in_next(self):
for next_node in self.next_nodes:
if next_node.type == "ModelSelection":
return True
return False
def _append_incoming_df(self, attribute_type: str):
"""
Append (concatenate) incoming dataframes from previous nodes.
:param attribute_type: train_df or test_df
:return: appended (concatenated) dataframe
"""
if len(self.previous_nodes) > 0:
concat_df = self.previous_nodes[0].state[attribute_type]
for i in range(1, len(self.previous_nodes)):
concat_df = concat_df.append(self.previous_nodes[i].state[attribute_type])
return concat_df
else:
return None
def _get_incoming_attribute(self, attribute_name: str):
if len(self.previous_nodes) > 0:
for previous_node in self.previous_nodes:
attribute = previous_node.state[attribute_name]
return attribute
return None
def _get_incoming_prediction_flow(self):
prediction_flow = []
ids = []
if len(self.previous_nodes) > 0:
for previous_node in self.previous_nodes:
current_prediction_flow = previous_node.prediction_flow
for step in current_prediction_flow:
if step["id"] not in ids:
ids.append(step["id"])
prediction_flow.append(step)
return prediction_flow
def _update_prediction_flow(self, result: dict):
self.prediction_flow = self._get_incoming_prediction_flow()
class_name = self._get_class_name()
if not (
self.type in ["Load", "Metrics"]
or (self.type == "Model" and not self.state["has_model_selection_in_next_step"])
):
step = {
"id": self.id,
"class_name": class_name,
"method_name": self.method_name,
"object": result.get("object"),
"transform_input": result.get("transform_input"),
"transform_output": result.get("transform_output"),
}
self.prediction_flow.append(step)
def predict_if_necessary(self):
class_name = self._get_class_name()
if "model_selection" in class_name or (
"model" in class_name and not self.state["has_model_selection_in_next_step"]
):
df = self.state["test_df"]
y_true = df[self.state["target_column"]].values.compute()
df = df.drop(columns=[self.state["target_column"]])
X = df
input_transformer = create_transform_pipeline(
prediction_flow=self.prediction_flow, transform_type="transform_input"
)
input_transformer_uri = ""
if input_transformer is not None:
input_transformer_uri = self._experiment_tracking.save_transformer(
obj=input_transformer, file_path="input_transformer"
)
X = input_transformer.transform(X=df)
y_pred = self.state["model"].predict(X=X)
output_transformer = create_transform_pipeline(
prediction_flow=self.prediction_flow, transform_type="transform_output"
)
output_transformer_uri = ""
if output_transformer is not None:
output_transformer_uri = self._experiment_tracking.save_transformer(
obj=output_transformer, file_path="output_transformer"
)
y_pred = output_transformer.inverse_transform(X=y_pred)
self.state["y_pred"] = y_pred
self.state["y_true"] = y_true
self.state["mongo_id"] = self._mongo.insert(
experiment_id=self.state["experiment_id"],
model_uri=self.state["model_uri"],
input_transformer_uri=input_transformer_uri,
output_transformer_uri=output_transformer_uri,
)
def _pre_run(self):
self.state = self._prepare_state()
self.set_current_experiment()
def run(self, global_variables: dict):
component_class: ComponentBase = self._import_component()
self._pre_run()
result = component_class.apply(self.method_name, self.state, self.attributes)
self.state.update(result)
self._update_prediction_flow(result=result)
self.post_run()
self.finished = True
return result
def post_run(self):
self.predict_if_necessary()
if self.type == "Metrics":
value = self.state[f"metric_{self.method_name}"]
self._mongo.add_metric(mongo_id=self.state["mongo_id"], metric_name=self.method_name, value=value)
没有合适的资源?快使用搜索试试~ 我知道了~
PyPI 官网下载 | flowi-0.3.7.tar.gz
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 169 浏览量
2022-01-11
20:04:14
上传
评论
收藏 36KB GZ 举报
温馨提示
共87个文件
py:49个
pyc:36个
pkg-info:1个
资源来自pypi官网。 资源全名:flowi-0.3.7.tar.gz
资源推荐
资源详情
资源评论
收起资源包目录
flowi-0.3.7.tar.gz (87个子文件)
flowi-0.3.7
PKG-INFO 914B
pyproject.toml 895B
setup.py 1KB
flowi
experiment_tracking
experiment_tracking.py 3KB
_base.py 1KB
__pycache__
__init__.cpython-38.pyc 245B
experiment_tracking.cpython-38.pyc 3KB
_base.cpython-38.pyc 2KB
_mlflow.cpython-38.pyc 3KB
__init__.py 82B
_mlflow.py 3KB
utilities
imports.py 269B
logger.py 779B
__pycache__
mongo.cpython-38.pyc 1KB
logger.cpython-38.pyc 1KB
strings.cpython-38.pyc 333B
singleton.cpython-38.pyc 800B
__init__.cpython-38.pyc 134B
imports.cpython-38.pyc 443B
strings.py 112B
__init__.py 0B
singleton.py 416B
mongo.py 3KB
prediction
prediction_offline.py 1KB
__pycache__
__init__.cpython-38.pyc 135B
prediction_flow.cpython-38.pyc 1KB
__init__.py 0B
prediction_flow.py 2KB
connections
aws
__init__.py 0B
s3.py 464B
__init__.py 0B
flow_chart
__pycache__
flow_chart.cpython-38.pyc 3KB
__init__.cpython-38.pyc 135B
node.cpython-38.pyc 6KB
topology.cpython-38.pyc 1KB
flow_chart.py 5KB
__init__.py 0B
node.py 8KB
topology.py 865B
__main__.py 4KB
__init__.py 22B
settings.py 920B
components
models
__pycache__
__init__.cpython-38.pyc 293B
_classification.cpython-38.pyc 3KB
__init__.py 141B
_classification.py 4KB
metrics
__pycache__
__init__.cpython-38.pyc 294B
_classification.cpython-38.pyc 1KB
__init__.py 141B
_classification.py 848B
model_selection
_model_selection.py 2KB
__pycache__
__init__.cpython-38.pyc 303B
_model_selection.cpython-38.pyc 2KB
__init__.py 142B
save
__pycache__
__init__.cpython-38.pyc 293B
_save_local.cpython-38.pyc 2KB
_save_local.py 1KB
__init__.py 138B
__pycache__
__init__.cpython-38.pyc 135B
component_base.cpython-38.pyc 2KB
preprocessing
preprocessing_base.py 836B
_preprocessing.py 2KB
__pycache__
_preprocessing.cpython-38.pyc 2KB
__init__.cpython-38.pyc 298B
preprocessing_labels.py 427B
preprocessing_audio.py 4KB
__init__.py 138B
preprocessing_string.py 2KB
preprocessing_utils.py 4KB
load
__pycache__
__init__.cpython-38.pyc 339B
_load_local.cpython-38.pyc 2KB
_load_s3.cpython-38.pyc 2KB
__init__.py 177B
_load_s3.py 2KB
_load_local.py 2KB
__init__.py 0B
data_preparation
_sklearn.py 617B
__pycache__
__init__.cpython-38.pyc 304B
_sklearn.cpython-38.pyc 1KB
__init__.py 150B
label
__pycache__
__init__.cpython-38.pyc 274B
_label.cpython-38.pyc 2KB
_transformers.cpython-38.pyc 2KB
_transformers.py 1KB
__init__.py 114B
_label.py 2KB
component_base.py 2KB
共 87 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 13w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功