from arkitekt.monitor.monitor import get_current_monitor
from herre.console.context import get_current_console
from uuid import uuid4
from arkitekt.packers.utils import expand_outputs, shrink_inputs
from asyncio.futures import Future
from arkitekt.messages.postman.log import LogLevel
from contextvars import Context
from arkitekt.messages.postman.reserve.reserve_transition import ReserveState
from arkitekt.contracts.exceptions import AssignmentException
from arkitekt.messages import *
from arkitekt.schema.enums import NodeType
from arkitekt.messages.postman.reserve.params import ReserveParams
from arkitekt.postman import Postman
from rich.table import Table
from rich.panel import Panel
from arkitekt.monitor import Monitor, current_monitor
from arkitekt.registry import get_current_postman
import asyncio
import logging
from herre.herre import Herre, get_current_herre
from koil.koil import Koil, get_current_koil
from koil.loop import koil, koil_gen
logger = logging.getLogger(__name__)
class UnknownMessageError(Exception):
pass
class ReservationError(Exception):
pass
class CouldNotReserveError(ReservationError):
pass
class IncorrectStateForAssignation(ReservationError):
pass
class AssignationError(Exception):
pass
class Omitted(AssignationError):
pass
def build_reserve_message(
reference,
node_id: str = None,
template_id: str = None,
provision: str = None,
params_dict: dict = {},
with_log=False,
context=None,
):
assert reference is not None, "Must have a reference"
assert (
node_id is not None or template_id is not None
), "Please provide either a node_id or template_id"
data = {
"node": node_id,
"template": template_id,
"provision": provision,
"params": params_dict,
}
meta = {
"reference": reference,
"extensions": {
"with_progress": with_log,
},
}
if context:
meta = {**meta, "context": context}
return BouncedReserveMessage(data=data, meta=meta)
else:
return ReserveMessage(data=data, meta=meta)
def build_unreserve_messsage(reference, reservation, with_log=False, context=None):
assert reference is not None, "Must have a reference"
data = {"reservation": reservation}
meta = {"reference": reference, "extensions": {"with_progress": with_log}}
if context:
meta = {**meta, "context": context}
return BouncedUnreserveMessage(data=data, meta=meta)
else:
return UnreserveMessage(data=data, meta=meta)
def build_assign_message(
reference, reservation, args, kwargs, with_log=False, context=None, persist=False
):
assert reference is not None, "Must have a reference"
data = {
"reservation": reservation,
"args": args,
"kwargs": kwargs,
}
meta = {
"reference": reference,
"extensions": {"with_progress": with_log, "persist": persist},
}
if context:
meta = {**meta, "context": context}
return BouncedAssignMessage(data=data, meta=meta)
else:
return AssignMessage(data=data, meta=meta)
def build_unassign_messsage(
reference, assignation, provision, with_log=False, context=None, persist=False
):
assert reference is not None, "Must have a reference"
data = {"assignation": assignation, "provision": provision}
meta = {
"reference": reference,
"extensions": {"with_progress": with_log, "persist": persist},
}
if context:
meta = {**meta, "context": context}
return BouncedUnassignMessage(data=data, meta=meta)
else:
return UnassignMessage(data=data, meta=meta)
class Reservation:
def __init__(
self,
node,
reference: str = None,
provision: str = None,
monitor: Monitor = None,
ignore_node_exceptions=False,
transition_hook=None,
with_log=False,
omit_on=[],
enter_on=[ReserveState.ACTIVE],
exit_on=[ReserveState.ERROR, ReserveState.CANCELLED, ReserveState.CRITICAL],
context: Context = None,
koil: Koil = None,
herre: Herre = None,
postman: Postman = None,
**params,
) -> None:
self.monitor: Monitor = monitor or get_current_monitor()
self.panel = (
self.monitor.create_reservation_panel(self) if self.monitor else None
)
self.console = get_current_console()
self.herre = herre or get_current_herre()
self.postman = postman or get_current_postman(force_creation=True)
# Reservation Params
self.reference = reference or str(uuid.uuid4())
self.provision = provision
self.node = node
self.params = ReserveParams(**params)
self.with_log = with_log or (self.monitor.log if self.monitor else None)
self.context = context # with_bounced allows us forward bounced checks
if self.context:
assert (
"can_forward_bounce" in self.herre.state.scopes
), "In order to use with_bounced forwarding you need to have the can_forward_bounced scope"
# Exception Mangement
self.ignore_node_exceptions = ignore_node_exceptions
self.critical_error = None
# State management
self.transition_hook = transition_hook
assert self.transition_hook is None or asyncio.iscoroutinefunction(
self.transition_hook
), "Transition Hook must be either a coroutine or set to None"
self.exit_states = exit_on
self.enter_states = enter_on
self.omit_states = omit_on
self.current_state = ReserveState.STARTING
def log(self, message: str, level: LogLevel = LogLevel.DEBUG):
"""Logs a Message
The Logged Message will be display on the Monitor if running inside a Monitor
and send to the logging output.
Args:
message (str): The Message
level (LogLevel, optional): The LogLevel. Defaults to LogLevel.DEBUG.
"""
if self.panel:
self.panel.log(message, level=level)
logger.info(f"{level}: {message}")
async def transition_state(self, message: ReserveTransitionMessage):
# Once we acquire a reserved resource our contract (the inner part of the context can start)
if self.transition_hook:
await self.transition_hook(self, message.data.state)
if message.data.state in self.exit_states:
if self.enter_future.done():
self.log(
f"We have transitioned to a critical State {message.data.message}. Terminating on Next Call"
)
else:
self.log("Cancelling Reservation")
print(message)
raise Exception(message.data.message)
if not self.is_closing:
self.log(
f"Received Exitstate: {message.data.state}. Closing reservation at next assignment",
level=LogLevel.CRITICAL,
)
if message.data.state in self.enter_states:
if self.enter_future.done():
logger.info("We are already entered.")
else:
self.enter_future.set_result(message.meta.reference)
self.old_state = self.current_state
self.current_state = message.data.state
self.log(
f"[red] {self.old_state} > {self.current_state}: {message.data.message}",
LogLevel.INFO,
)
async def assign_async(
self,
*args,
bypass_shrink=False,
bypass_expand=False,
persist=True,
with_log=True,
context=None,
raise_node_exceptions=True,
**kwargs,
):
logger.info(f"Assigning {args} {kwargs} ")
assert (
self.node.type == NodeType.FUNCTION
), "You cannot assign to a Generator Node, use
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
PyPI 官网下载 | arkitekt-0.1.90.tar.gz (158个子文件)
red pulse.gif 106KB
red pulse.gif 106KB
pink pulse.gif 105KB
pink pulse.gif 105KB
green pulse.gif 103KB
green pulse.gif 103KB
orange pulse.gif 93KB
orange pulse.gif 93KB
PKG-INFO 628B
gear.png 10KB
gear.png 6KB
reservation.py 23KB
functional.py 14KB
actify.py 13KB
ports.py 11KB
app.py 9KB
base.py 8KB
main.py 7KB
base.py 7KB
magic_bar.py 6KB
websocket.py 5KB
websocket.py 5KB
agent.py 5KB
monitor.py 5KB
utils.py 5KB
templates.py 5KB
standard.py 5KB
assign_widget.py 4KB
node.py 4KB
autoreload.py 4KB
postman.py 4KB
node.py 3KB
provisions.py 3KB
base.py 3KB
registry.py 3KB
utils.py 3KB
actor.py 3KB
qtlistsearchwidget.py 2KB
registry.py 2KB
types.py 2KB
base.py 2KB
registry.py 2KB
setup.py 2KB
utils.py 2KB
comfort.py 2KB
provide_transition.py 2KB
base.py 2KB
registry.py 2KB
base.py 1KB
script.py 1KB
qtsliderwidget.py 1KB
graphql.py 1KB
base.py 1KB
widgets.py 1KB
reserve_transition.py 1KB
structure.py 1KB
base.py 1020B
provide_log.py 883B
assign_critical.py 876B
bounced_forwarded_reserve.py 859B
bounced_assign.py 856B
bounced_forwarded_assign.py 836B
bounced_forwarded_unassign.py 828B
bounced_unassign.py 815B
bounced_provide.py 814B
exception.py 810B
bounced_reserve.py 805B
assign_log.py 803B
settings_popup.py 780B
provide.py 768B
config.py 756B
unprovide_critical.py 744B
assign.py 744B
reserve.py 743B
registry.py 727B
template.py 726B
provide_bound.py 718B
__init__.py 714B
unprovide.py 709B
basic.py 697B
threadvars.py 696B
unassign.py 695B
reserve_critical.py 694B
unassign_done.py 693B
bounced_unprovide.py 691B
assign_yield.py 685B
unreserve_done.py 682B
reserve_waiting.py 679B
assign_cancelled.py 675B
agent_disconnect.py 673B
assign_return.py 672B
unprovide_error.py 669B
unprovide_done.py 666B
reserve_done.py 666B
agent_connect.py 664B
provide_done.py 661B
ward.py 654B
bounced_forwarded_unreserve.py 653B
bounced_unreserve.py 653B
unreserve_critical.py 651B
共 158 条
- 1
- 2
资源评论
挣扎的蓝藻
- 粉丝: 13w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功