#!/usr/bin/env python
# -*- coding: gb2312 -*-
import Queue, time, threading
from . import QueueInterface
class PriorityQueue(QueueInterface.QueueInterface):
class TaskNumber:
def __init__(self, task_numbers = 0):
self.__m_ts = task_numbers
self.__m_cond = threading.Condition(threading.Lock())
def Inc(self, number = 1):
return self.__do_opt(number)
def Dec(self, number = 1):
return self.__do_opt(0 - number)
def __do_opt(self, value):
self.__m_cond.acquire()
prevalue = self.__m_ts
self.__m_ts += value
if self.__m_ts == 0:
self.__m_cond.notify()
self.__m_cond.release()
return prevalue
def getNumber(self):
return self.__m_ts
def Wait(self):
self.__m_cond.acquire()
while self.__m_ts != 0:
self.__m_cond.wait()
self.__m_cond.release()
def __init__(self, maxsize = 0):
QueueInterface.QueueInterface.__init__(self)
self.__maxsize = maxsize
self.__m_elemlist = []
self.__m_lock = threading.Condition()
self.__m_tasks = PriorityQueue.TaskNumber()
def qsize(self):
return len(self.__m_elemlist)
def qsize_cmp(self, cmp_function):
self.__lock_queue()
value = 0
for i in range(0, len(self.__m_elemlist)):
if cmp_function( self.__m_elemlist[i] ) == True:
value += 1
self.__unlock_queue()
return value
def empty(self):
return self.qsize() == 0 and True or False
def full(self):
if self.__maxsize == 0:
return False
return self.qsize() == self.__maxsize and True or False
def put(self, item, block = True, timeout = None):
# 阻塞:
# 1. 上写锁
# 2. 获取队列是否满
# 3. 如果为满,则释放写锁,并等待,直到队列中有空余位置或者超时
# 如果超时,则抛出满异常,否则转1.
# 4. 如果不为满,则根据优先级把元素放到队列中,并队列元素个数++
# 5. 释放写锁
# 非阻塞:
# 1. 上写锁
# 2. 获取队列是否为满
# 3. 如果为满,则抛出异常队列满,否则把元素放到队列中,并队列元素个数++
# 4. 释放写锁
if block == False:
# none blocking mode
self.__lock_queue()
if self.__maxsize == 0 or self.__maxsize > len(self.__m_elemlist):
# the queue is not full
self.__put(item)
self.__notify_queue()
else:
# the queue is full
self.__unlock_queue()
raise Queue.Full
self.__unlock_queue()
else:
# blcking mode
if timeout == None or timeout <= 0:
# infinite wait, if queue is full
self.__lock_queue()
while not(self.__maxsize == 0 or self.__maxsize > len(self.__m_elemlist)):
# queue is full
self.__wait_queue()
self.__put(item)
self.__notify_queue()
self.__unlock_queue()
else:
# wait time = timeout
now = time.time()
times = timeout
while (True):
ret = self.__lock_queue(times)
if ret == False:
raise Queue.Full
else:
# lock succeed, check time
t = now + times - time.time()
if t <= 0:
self.__unlock_queue()
raise Queue.Full
# check if queue is full, if full, wait times = t
if not(self.__maxsize == 0 or self.__maxsize > len(self.__m_elemlist)):
# queue is full
times = t
self.__unlock_queue()
continue
else:
self.__put(item)
self.__notify_queue()
self.__unlock_queue()
break
def __lock_queue(self, timeout = 0):
''' lock the queue
@param timeout time(unit seconds)
'''
now = time.time()
if timeout <= 0:
self.__m_lock.acquire()
return True
else:
while self.__m_lock.acquire(False) == False:
if (now + timeout) < time.time():
# timeout
return False
return True
def __unlock_queue(self):
self.__m_lock.release()
def __wait_queue(self):
self.__m_lock.wait()
def __notify_queue(self):
self.__m_lock.notify()
def __put(self, item):
def my_import(module_name):
mod = __import__(module_name)
components = module_name.split('.')
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
self.__m_elemlist.append( item )
try:
module = my_import( item.__module__ )
except Exception:
module = None
if hasattr( module, 'Cmp' ):
self.__m_elemlist.sort(key = module.Cmp)
else:
# 缺省排序
self.__m_elemlist.sort()
self.__m_tasks.Inc()
def __get(self):
return self.__m_elemlist.pop(0)
def put_nowait(self, item):
self.put(item, False)
def get(self, block = True, timeout = None):
# 阻塞:
# 1. 上写锁
# 2. 获取队列是否为空
# 3. 如果为空,则释放写锁,并等待,直到队列中有元素或者超时
# 如果.超时,则抛出异常。否则转1.
# 4. 如果不为空,则根据优先顺序,取优先级最高的元素。
# 5. 释放写锁
# 6. 返回元素。
# 非阻塞:
# 1. 上写锁
# 2. 获取队列是否为空
# 3. 如果为空,则抛出异常,否则获取优先级最高的元素。
# 4. 释放写锁
# 5. 返回元素
if block == False:
# none blocking mode
self.__lock_queue()
if len(self.__m_elemlist) == 0:
# the queue is empty
self.__unlock_queue()
raise Queue.Empty
else:
# the queue is not empty
obj = self.__get()
self.__notify_queue()
self.__unlock_queue()
return obj
else:
# blocking mode
if timeout == None or timeout <= 0:
# infinite wait, if queue is empty
self.__lock_queue()
while len(self.__m_elemlist) == 0:
# the queue is empty
self.__wait_queue()
# the queue is not empty
obj = self.__get()
self.__notify_queue()
self.__unlock_queue()
return obj
else:
# wait time = timeout
now = time.time()
times = timeout
while (True):
ret = self.__lock_queue(times)
if ret == False:
raise Queue.Empty
else:
# lock succeed, check time
t = now + times - time.time()
if t <= 0:
没有合适的资源?快使用搜索试试~ 我知道了~
pyMsgComm python 消息库 框架
共108个文件
py:104个
jude:1个
doc:1个
需积分: 3 4 下载量 37 浏览量
2008-09-11
23:55:49
上传
评论
收藏 727KB ZIP 举报
温馨提示
This project is wrote in Python. Its main purpose is to facilitate client-server communication via given protocol such as tcp or udp etc. It provides a framework to facilitate writing client-server communication. You can just implement your client-server communication protocol by simply inheriting some well-defined classes. Now, the client-side codes are finished, but server-side is not. I hope some people who are interested in this project could take charge of it.
资源详情
资源评论
资源推荐
收起资源包目录
pyMsgComm python 消息库 框架 (108个子文件)
test.cfg 249B
详细设计.doc 956KB
MsgComm.jude 389KB
data.pkl 50B
PriorityQueue.py 9KB
test_PacketSender.py 8KB
CfgFile.py 8KB
test_SessionManager.py 7KB
PacketSender.py 7KB
test_scenario_1.py 7KB
test_PacketReceiver.py 6KB
test_PriorityQueue.py 6KB
PacketReceiver.py 6KB
TcpSocket.py 6KB
test_ResourceAllocator.py 5KB
DefaultRecvScheduler.py 5KB
test_TcpSocket.py 4KB
TimerEventManager.py 4KB
test_CfgFile.py 4KB
test_TimerEventManager.py 4KB
test_TcpSocketShort.py 4KB
SessionManager.py 3KB
InitialPacket.py 3KB
AckPacket.py 3KB
test_WorkerThread.py 3KB
ResourceManager.py 3KB
WorkerThread.py 3KB
PacketProperty.py 3KB
PacketAssembler.py 3KB
ThreadPool.py 3KB
test_DefaultSendScheduler.py 2KB
Communicator.py 2KB
test_InitialPacket.py 2KB
Packet.py 2KB
test_Communicator.py 2KB
test_DefaultRecvScheduler.py 2KB
DefaultSendScheduler.py 2KB
test_PacketAssembler.py 2KB
ResourceAllocator.py 2KB
test_queue.py 2KB
test_PacketProperty.py 2KB
test_AckPacket.py 2KB
test_QuitTimerEvent.py 2KB
test_thread.py 2KB
test_ThreadPoolManager.py 2KB
test_ResourceTypes.py 2KB
test_ThreadPool.py 2KB
TcpSocketShort.py 2KB
test_ResourceManager.py 2KB
test_TimerEvent.py 2KB
test_PacketRecvScheduler.py 2KB
test_PacketStatus.py 2KB
Logger.py 2KB
test_PacketSendScheduler.py 1KB
PacketAllocator.py 1KB
IntegerAllocator.py 1KB
StringAllocator.py 1KB
test_PacketHookFunction.py 1KB
test_PacketRetrans.py 1KB
PacketHookFunction.py 1KB
test_PacketIOMethod.py 1KB
regression.py 1KB
ResourceTypes.py 1KB
TimerEvent.py 1KB
queue.py 946B
StringPacket.py 888B
PacketStatus.py 883B
IntegerPacket.py 865B
test_Logger.py 825B
thread.py 824B
QueueInterface.py 535B
PacketRecvScheduler.py 531B
ThreadPoolManager.py 530B
PacketRetrans.py 525B
PacketIOMethod.py 514B
PacketSendScheduler.py 446B
Task.py 429B
QuitTimerEvent.py 357B
ErrorHandler.py 325B
__init__.py 229B
__init__.py 198B
ExitTask.py 179B
__init__.py 141B
__init__.py 135B
__init__.py 133B
__init__.py 131B
__init__.py 128B
__init__.py 127B
__init__.py 125B
__init__.py 119B
__init__.py 111B
__init__.py 109B
__init__.py 107B
__init__.py 101B
__init__.py 99B
__init__.py 99B
__init__.py 97B
__init__.py 96B
__init__.py 92B
__init__.py 89B
共 108 条
- 1
- 2
collide
- 粉丝: 15
- 资源: 7
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0