# coding: utf-8
import time
import json
import multiprocessing
import socket
import uuid
import zmq
from parkworker.const import MONIT_WORKER_HEART_BEAT_PERIOD, MONIT_STATUS_EVENT, MONIT_WORKER_EVENT, MONIT_TASK_EVENT
from parkworker.utils import now, json_default
from parkworker.monits.base import Monit
class BaseMonitWorker(multiprocessing.Process):
id = None
uuid = None
created_dt = None
host_name = None
tasks = None
monit_scheduler_port = None
ZMQ_SERVER_ADDRESS = None
ZMQ_WORKER_REGISTRATOR_PORT = None
worker_type = None
def emit_event(self, *args, **kwargs):
raise NotImplemented()
def setup(self, worker_id=None):
if worker_id is None:
self.id = self.uuid
else:
self.id = worker_id
self.uuid = str(uuid.uuid4())
self.created_dt = now()
self.host_name = socket.gethostname()
self.tasks = dict()
self._register_worker()
def run(self):
print('Worker start %s' % self.id)
heart_beat_process = multiprocessing.Process(target=self._heart_beat)
heart_beat_process.daemon = True
heart_beat_process.start()
self._process_tasks()
def _process_tasks(self):
task_socket = self._get_task_socket()
try:
while True:
task = task_socket.recv_json()
self._register_start_task(task)
monit = Monit.get_monit(task['monit_name'])()
result = monit.check(
host=task['host_address'],
**task['options']
)
self._after_check(task, result)
finally:
self._emit_worker({'stop_dt': now()})
task_socket.close()
def _heart_beat(self):
while True:
self._emit_worker()
time.sleep(MONIT_WORKER_HEART_BEAT_PERIOD)
def _after_check(self, task, result):
self._register_complete_task(task, result)
# get new monitoring results
self.emit_event(MONIT_STATUS_EVENT, json.dumps(task, default=json_default))
def _get_task_socket(self):
context = zmq.Context()
task_socket = context.socket(zmq.PULL)
task_socket.connect("tcp://%s:%s" % (self.ZMQ_SERVER_ADDRESS, self.monit_scheduler_port))
print('MonitWorker connect to', self.ZMQ_SERVER_ADDRESS, self.monit_scheduler_port)
return task_socket
def _register_worker(self):
context = zmq.Context()
register_socket = context.socket(zmq.REQ)
register_socket.connect("tcp://%s:%s" % (self.ZMQ_SERVER_ADDRESS, self.ZMQ_WORKER_REGISTRATOR_PORT))
try:
monit_names = [n for n, _ in Monit.get_all_monits()]
register_data = {
'main': self._get_worker(),
'heart_beat_dt': now(),
'monit_names': monit_names,
}
register_data_json = json.dumps(register_data, default=json_default)
register_socket.send_string(register_data_json)
# print('register_worker send', register_data_json)
keeper_answer = register_socket.recv_string()
# print('register_worker got', keeper_answer)
answer_data = json.loads(keeper_answer)
self.monit_scheduler_port = answer_data['monit_scheduler_port']
finally:
register_socket.close()
def _register_start_task(self, task):
print("Worker %s. Received request: %s for %s. %s" % (self.id, task['monit_name'], task['host_address'], now()))
self._add_current_task(task)
task['start_dt'] = now()
task['worker'] = self._get_worker()
self.emit_event(MONIT_TASK_EVENT, json.dumps(task, default=json_default))
def _register_complete_task(self, task, result):
self._rm_current_task(task)
task['result'] = result.get_dict()
self.emit_event(MONIT_TASK_EVENT, json.dumps(task, default=json_default))
def _add_current_task(self, task):
task_id = self._get_task_id(task)
self.tasks[task_id] = task
self._emit_worker({'tasks': list(self.tasks.keys())})
def _rm_current_task(self, task):
task_id = self._get_task_id(task)
del self.tasks[task_id]
self._emit_worker({'tasks': list(self.tasks.keys())})
def _get_worker(self):
return {
'id': str(self.id),
'uuid': self.uuid,
'created_dt': self.created_dt,
'host_name': self.host_name,
'type': self.worker_type,
}
def _emit_worker(self, data=None):
worker_data = {
'main': self._get_worker(),
'heart_beat_dt': now(),
}
if data:
worker_data.update(data)
worker_data_json = json.dumps(worker_data, default=json_default)
self.emit_event(MONIT_WORKER_EVENT, worker_data_json)
@staticmethod
def _get_task_id(task):
return task['_id']['$oid']
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
资源分类:Python库 所属语言:Python 资源全名:park-worker-base-0.0.3.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源推荐
资源详情
资源评论
收起资源包目录
park-worker-base-0.0.3.tar.gz (9个子文件)
park-worker-base-0.0.3
PKG-INFO 282B
setup.py 451B
parkworker
const.py 324B
monit_worker.py 5KB
utils.py 457B
monits
__init__.py 16B
base.py 2KB
settings_default.py 152B
__init__.py 16B
共 9 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 14w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功