self.socket.settimeout(timeout) # 超时设置
self.queue = queue.Queue() # 队列
self.helper = MessagePump.MPHelper(self) # 接收消息
# 运行主线程
def run(self):
self.helper.start() # 开启收消息的线程
while not self.abort:
message = self.waitForMessage() # 阻塞等待
self.owner.recvMessage(message) # 收取消息
# 等待消息
def waitForMessage(self):
try:
msg = self.queue.get(True, 3) # 抓取数据,最多等待3s
return msg
except:
return None
# 发送消息
def sendMessage(self, message):
bytes = pickle.dumps(message) # 转化为二进制
address = ("localhost", message.to) # 地址ip,端口(ip,port)
self.socket.sendto(bytes, address)
return True
#是否停止收取消息
def doAbort(self):
self.abort = True
再来一个消息处理器,再来一个消息处理器,模拟消息的传递,延迟,丢包,其实这个类没什么卵用,这个是为模拟测试准备的
from MessagePump import MessagePump
import random
class AdversarialMessagePump(MessagePump): # 类的继承
# 对抗消息传输,延迟消息并任意顺序传递,模拟网络的延迟,消息传送并不是顺序
def __init__(self, owner, port, timeout=2):
MessagePump.__init__(self, owner, port, timeout) # 初始化父类
self.messages = set() # 集合避免重复
def waitForMessage(self):
try:
msg = self.queue.get(True, 0.1) # 从队列抓取数据
self.messages.add(msg) # 添加消息
except Exception as e: # 处理异常
pass
# print(e)
if len(self.messages) > 0 and random.random() < 0.95: # Arbitrary!
msg = random.choice(list(self.messages)) # 随机抓取消息发送
self.messages.remove(msg) # 删除消息
else:
msg = None
return msg
再来一个是记录类再来一个是记录类
# InstanceRecord本地记录类,主要记录追随者、领导者最高编号的协议
from PaxosLeaderProtocol import PaxosLeaderProtocol
class InstanceRecord:
def __init__(self):
self.protocols = {}
self.highestID = (-1, -1) # (port,count)
self.value = None
def addProtocol(self, protocol):
self.protocols[protocol.proposalID] = protocol
#
if protocol.proposalID[1] > self.highestID[1] or (
protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]):
self.highestID = protocol.proposalID # 取得编号最大的协议
def getProtocol(self, protocolID):
return self.protocols[protocolID]
def cleanProtocols(self):
keys = self.protocols.keys()
for k in keys:
protocol = self.protocols[k]
if protocol.state == PaxosLeaderProtocol.STATE_ACCEPTED:
print("删除协议")
del self.protocols[k]
评论0
最新资源