from multiprocessing import Queue
from cStringIO import StringIO
import logging
import uuid
import pika
from concurrent.futures import ThreadPoolExecutor
from thrift.TTornado import _Lock
from thrift.transport.TTransport import TTransportBase
from tornado import gen, ioloop
from tornado.concurrent import run_on_executor
from pika.adapters import TornadoConnection
import constant
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class TAMQPTornadoTransport(TTransportBase):
executor = ThreadPoolExecutor(10)
def __init__(self, channel=None, exchange_name=constant.EXCHANGE_NAME,
routing_key=constant.ROUTING_KEY, properties=None,
method=None, io_loop=None, **kwargs):
self._channel = channel
self._exchange_name = exchange_name
self._routing_key = routing_key
self._wbuf = StringIO()
self._properties = properties
self._method = method
self._url = kwargs.get('url', constant.DEFAULT_URL)
self._reply_to = None
self._lock = _Lock()
self._callback = None
self._connection = None
self._consumer_tag = None
self._callback_queue = Queue()
self.io_loop = io_loop or ioloop.IOLoop.instance()
@gen.engine
def assign_queue(self):
logger.info("Openning callback queue")
result = yield gen.Task(self._channel.queue_declare,
exclusive=True)
logger.info("Callback queue openned")
self._reply_to = result.method.queue
self._lock.release()
self._consumer_tag = self._channel.basic_consume(self.on_reply_message,
self._reply_to)
if self._callback:
self._callback()
def on_reply_message(self, _channel, method, properties, body):
if method:
self._channel.basic_ack(delivery_tag=method.delivery_tag)
self._callback_queue.put(body)
def on_connection_open(self, _connection):
logger.info("Openning channel")
self._connection.channel(on_open_callback=self.on_channel_open)
def open(self, callback=None):
logger.info("Openning AMQP transport")
if self._channel is not None:
logger.info("Already set")
callback()
else:
logger.info("Openning connection")
self._callback = callback
self._connection = TornadoConnection(pika.URLParameters(self._url),
self.on_connection_open)
self._lock.acquire()
@run_on_executor
def waitForFrame(self):
return self._callback_queue.get(True)
@gen.coroutine
def readFrame(self):
result = yield self.waitForFrame()
raise gen.Return(result)
def on_channel_open(self, channel):
logger.info("Channel openned")
self._channel = channel
self.assign_queue()
def close(self):
if self._channel:
if self._consumer_tag:
self._channel.basic_cancel(consumer_tag=self._consumer_tag,
nowait=True)
self._channel.close()
def isOpen(self):
return self._channem is not None
def read(self, _):
assert False, "wrong stuff"
def write(self, buf):
self._wbuf.write(buf)
@gen.engine
def flush(self):
if self._properties is not None:
props = pika.BasicProperties(
correlation_id=self._properties.correlation_id)
self._channel.basic_publish(exchange='',
routing_key=self._properties.reply_to,
properties=props,
body=self._wbuf.getvalue())
if self._method is not None:
self._channel.basic_ack(
delivery_tag=self._method.delivery_tag)
else:
with (yield self._lock.acquire()):
props = pika.BasicProperties(correlation_id=str(uuid.uuid4()),
reply_to=self._reply_to)
self._channel.basic_publish(exchange=self._exchange_name,
routing_key=self._routing_key,
properties=props,
body=str(self._wbuf.getvalue()))
self._wbuf = StringIO()
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
thrift_amqp_tornado-0.0.2.tar.gz (6个子文件)
thrift_amqp_tornado-0.0.2
PKG-INFO 310B
thrift_amqp_tornado
server.py 3KB
constant.py 172B
__init__.py 82B
transport.py 4KB
setup.py 352B
共 6 条
- 1
资源评论
挣扎的蓝藻
- 粉丝: 12w+
- 资源: 15万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功