import logging
import random
import time
import asyncore
import socket
from collections import deque
from gearman.protocol import DEFAULT_PORT, ProtocolError, parse_command, pack_command
class GearmanServerClient(asyncore.dispatcher):
def __init__(self, sock, addr, server, manager):
asyncore.dispatcher.__init__(self, sock)
self.addr = addr
self.server = server
self.manager = manager
self.in_buffer = ""
self.out_buffer = ""
manager.register_client(self)
def writable(self):
return len(self.out_buffer) != 0
def handle_close(self):
self.close()
self.manager.deregister_client(self)
def handle_read(self):
data = self.recv(8192)
if not data:
self.close()
return
self.in_buffer += data
commands = []
while True:
try:
func, args, cmd_len = parse_command(self.in_buffer)
except ProtocolError, exc:
logging.error("[%s] ProtocolError: %s" % (self.addr, str(exc)))
self.close()
return
if not func:
break
self.handle_command(func, args)
self.in_buffer = buffer(self.in_buffer, cmd_len)
def handle_command(self, func, args):
if func == "echo_req":
self.send_command("echo_res", args)
elif func == "submit_job":
handle = self.manager.add_job(self, **args)
self.send_command("job_created", {'handle': handle})
elif func == "submit_job_high":
handle = self.manager.add_job(self, high=True, **args)
self.send_command("job_created", {'handle': handle})
elif func == "submit_job_bg":
handle = self.manager.add_job(self, bg=True, **args)
self.send_command("job_created", {'handle': handle})
elif func in ("can_do", "can_do_timeout"):
self.manager.can_do(self, **args)
elif func == "cant_do":
self.manager.cant_do(self, **args)
elif func == "grab_job":
job = self.manager.grab_job(self)
if job:
self.send_command("job_assign", {'handle':job.handle, 'func':job.func, 'arg':job.arg})
else:
self.send_command("no_job")
elif func == "pre_sleep":
if not self.manager.sleep(self):
self.wakeup()
elif func == "work_complete":
self.manager.work_complete(self, **args)
elif func == "work_fail":
self.manager.work_fail(self, **args)
# Text commands
elif func == "status":
status = self.manager.get_status(self)
for s in status:
self.send_buffered("%s\t%d\t%d\t%d\n" % (s['func'], s['num_jobs'], s['num_working'], s['num_workers']))
self.send_buffered(".\n")
elif func == "version":
from gearman import __version__
self.send_buffered("%s\n" % __version__)
elif func == "workers":
for client, state in self.manager.states.items():
# if not state.abilities:
# continue
self.send_buffered("%d %s %s : %s\n" % (client.socket.fileno(), client.addr[0], state.client_id, " ".join(state.abilities)))
self.send_buffered(".\n")
# elif func == "maxqueue":
#
# This sets the maximum queue size for a function. If no size is
# given, the default is used. If the size is negative, then the queue
# is set to be unlimited. This sends back a single line with "OK".
#
# Arguments:
# - Function name.
# - Optional maximum queue size.
#
elif func == "shutdown":
# TODO: optional "graceful" argument - close listening socket and let all existing connections complete
self.server.stop()
else:
logging.error("Unhandled command %s: %s" % (func, args))
def handle_write(self):
if len(self.out_buffer) == 0:
return 0
try:
nsent = self.send(self.out_buffer)
except socket.error, e:
self.close()
return
self.out_buffer = buffer(self.out_buffer, nsent)
def send_buffered(self, data):
self.out_buffer += data
def send_command(self, name, kwargs={}):
self.send_buffered(pack_command(name, **kwargs))
def wakeup(self):
self.send_command('noop')
def work_complete(self, handle, result):
self.send_command('work_complete', {'handle':handle, 'result':result})
def work_fail(self, handle):
self.send_command('work_fail', {'handle':handle})
class Job(object):
def __init__(self, owner, handle, func, arg, bg=False, high=False, uniq=None):
self.owner = owner
self.handle = handle
self.func = func
self.arg = arg
self.bg = bg
self.high = high
self.uniq = uniq
self.worker = None
self.timeout = None
class ClientState(object):
def __init__(self, client):
self.client = client
self.sleeping = False
self.client_id = "-"
# Clients
self.jobs = []
# Workers
self.abilities = {}
self.working = []
class GearmanTaskManager(object):
def __init__(self):
self.max_id = 0
self.states = {} # {client: ClientState}
self.jobqueue = {} # {function, [job]}
self.jobs = {} # {handle: job}
self.uniq_jobs = {} # {function: {uniq: job}}
self.workers = {} # {function: [state]}
self.working = set() # set([job])
def add_job(self, client, func, arg, uniq=None, high=False, bg=False):
state = self.states[client]
job = Job(state, self.new_handle(), func=func, arg=arg, uniq=uniq, high=False, bg=False)
state.jobs.append(job)
if func not in self.jobqueue:
self.jobqueue[func] = deque([job])
else:
self.jobqueue[func].append(job)
self.jobs[job.handle] = job
workers = self.workers.get(func, [])
for w in workers:
if w.sleeping:
w.client.wakeup()
return job.handle
def can_do(self, client, func, timeout=None):
state = self.states[client]
state.abilities[func] = int(timeout) if timeout else None
if func not in self.workers:
self.workers[func] = set((state,))
else:
self.workers[func].add(state)
def cant_do(self, client, func):
state = self.states[client]
state.abilities.pop(func, None)
self.workers[func].pop(state, None)
def grab_job(self, client, grab=True):
state = self.states[client]
abilities = state.abilities.keys()
random.shuffle(abilities)
for f in abilities:
jobs = self.jobqueue.get(f)
if jobs:
if not grab:
return True
job = jobs.popleft()
job.worker = state
timeout = state.abilities[f]
job.timeout = time.time() + timeout if timeout else None
self.working.add(job)
state.working.append(job)
return job
return None
def sleep(self, client):
has_job = self.grab_job(client, False)
if has_job:
return False
state = self.states[client]
state.sleeping = True
return True
def work_complete(self, client, handle, result):
job = self.jobs[handle]
job.owner.client.work_complete(handle, result)
self._remove_job(job)
def work_fail(self, client, handle):
job = self.jobs[handle]
job.owner.client.work_fail(handle)
self._remove_job(job)
def _remove_job(self, job):
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
共29个文件
py:23个
in:2个
_authors:1个
资源分类:Python库 所属语言:Python 资源全名:gearman-1.3.1.tar.gz 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059
资源详情
资源评论
资源推荐
收起资源包目录
gearman-1.3.1.tar.gz (29个子文件)
gearman-1.3.1
MANIFEST.in 69B
PKG-INFO 439B
._AUTHORS 186B
LICENSE 1KB
._setup.py 186B
AUTHORS 99B
setup.py 524B
._tests.py 187B
gearman
server.py 10KB
.___init__.py 187B
manager.py 2KB
task.py 3KB
._task.py 187B
._manager.py 185B
._connection.py 187B
._server.py 188B
util.py 52B
client.py 7KB
compat.py 209B
._worker.py 187B
__init__.py 912B
connection.py 6KB
._protocol.py 186B
protocol.py 3KB
worker.py 6KB
._client.py 186B
._compat.py 186B
tests.py 5KB
._MANIFEST.in 185B
共 29 条
- 1
挣扎的蓝藻
- 粉丝: 12w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0