<?php
namespace PhpAmqpLib\Channel;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Helper\MiscHelper;
class AMQPChannel extends AbstractChannel
{
public $callbacks = array();
/**
*
* @var callable these parameters will be passed to function
* in case of basic_return:
* param int $reply_code
* param string $reply_text
* param string $exchange
* param string $routing_key
* param AMQPMessage $msg
*/
protected $basic_return_callback = null;
public function __construct($connection,
$channel_id=null,
$auto_decode=true)
{
if ($channel_id == null) {
$channel_id = $connection->get_free_channel_id();
}
parent::__construct($connection, $channel_id);
if ($this->debug) {
MiscHelper::debug_msg("using channel_id: " . $channel_id);
}
$this->default_ticket = 0;
$this->is_open = false;
$this->active = true; // Flow control
$this->alerts = array();
$this->callbacks = array();
$this->auto_decode = $auto_decode;
$this->x_open();
}
public function __destruct()
{
//TODO:???if($this->connection)
// $this->close("destroying channel");
}
/**
* Tear down this object, after we've agreed to close with the server.
*/
protected function do_close()
{
$this->is_open = false;
unset($this->connection->channels[$this->channel_id]);
$this->channel_id = $this->connection = null;
}
/**
* Only for AMQP0.8.0
* This method allows the server to send a non-fatal warning to
* the client. This is used for methods that are normally
* asynchronous and thus do not have confirmations, and for which
* the server may detect errors that need to be reported. Fatal
* errors are handled as channel or connection exceptions; non-
* fatal errors are sent through this method.
*/
protected function channel_alert($args)
{
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$details = $args->read_table();
array_push($this->alerts,array($reply_code, $reply_text, $details));
}
/**
* request a channel close
*/
public function close($reply_code=0,
$reply_text="",
$method_sig=array(0, 0))
{
list($class_id, $method_id, $args) = $this->protocolWriter->channelClose(
$reply_code,
$reply_text,
$method_sig[0],
$method_sig[1]
);
$this->send_method_frame(array($class_id, $method_id), $args);
return $this->wait(array(
$this->waitHelper->get_wait('channel.close_ok')
));
}
protected function channel_close($args)
{
$reply_code = $args->read_short();
$reply_text = $args->read_shortstr();
$class_id = $args->read_short();
$method_id = $args->read_short();
$this->send_method_frame(array(20, 41));
$this->do_close();
throw new AMQPProtocolChannelException($reply_code, $reply_text,
array($class_id, $method_id));
}
/**
* confirm a channel close
*/
protected function channel_close_ok($args)
{
$this->do_close();
}
/**
* enable/disable flow from peer
*/
public function flow($active)
{
list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
$this->send_method_frame(array($class_id, $method_id), $args);
return $this->wait(array(
$this->waitHelper->get_wait('channel.flow_ok')
));
}
protected function channel_flow($args)
{
$this->active = $args->read_bit();
$this->x_flow_ok($this->active);
}
protected function x_flow_ok($active)
{
list($class_id, $method_id, $args) = $this->protocolWriter->channelFlow($active);
$this->send_method_frame(array($class_id, $method_id), $args);
}
protected function channel_flow_ok($args)
{
return $args->read_bit();
}
protected function x_open($out_of_band="")
{
if ($this->is_open) {
return;
}
list($class_id, $method_id, $args) = $this->protocolWriter->channelOpen($out_of_band);
$this->send_method_frame(array($class_id, $method_id), $args);
return $this->wait(array(
$this->waitHelper->get_wait('channel.open_ok')
));
}
protected function channel_open_ok($args)
{
$this->is_open = true;
if ($this->debug) {
MiscHelper::debug_msg("Channel open");
}
}
/**
* request an access ticket
*/
public function access_request($realm, $exclusive=false,
$passive=false, $active=false, $write=false, $read=false)
{
list($class_id, $method_id, $args) = $this->protocolWriter->accessRequest($realm, $exclusive,
$passive, $active,
$write, $read);
$this->send_method_frame(array($class_id, $method_id), $args);
return $this->wait(array(
$this->waitHelper->get_wait('access.request_ok')
));
}
/**
* grant access to server resources
*/
protected function access_request_ok($args)
{
$this->default_ticket = $args->read_short();
return $this->default_ticket;
}
/**
* declare exchange, create if needed
*/
public function exchange_declare($exchange,
$type,
$passive=false,
$durable=false,
$auto_delete=true,
$internal=false,
$nowait=false,
$arguments=null,
$ticket=null)
{
$arguments = $this->getArguments($arguments);
$ticket = $this->getTicket($ticket);
list($class_id, $method_id, $args) =
$this->protocolWriter->exchangeDeclare(
$ticket, $exchange, $type, $passive, $durable,
$auto_delete, $internal, $nowait, $arguments
);
$this->send_method_frame(array($class_id, $method_id), $args);
if (!$nowait) {
return $this->wait(array(
$this->waitHelper->get_wait('exchange.declare_ok')
));
}
}
/**
* confirms an exchange declaration
*/
protected function exchange_declare_ok($args)
{
}
/**
* delete an exchange
*/
public function exchange_delete($exchange, $if_unused=false,
$nowait=false, $ticket=null)
{
$ticket = $this->getTicket($ticket);
list($class_id, $method_id, $args) = $this->protocolWriter->exchangeDelete($ticket, $exchange, $if_unused, $nowait);
$this->send_method_frame(array($class_id, $method_id), $args);
if (!$nowait) {
return $this->wait(array(
$this->waitHelper->get_wait('exchange.delete_ok')
));
}
}
/**
* confirm deletion of an exchange
*/
protected function exchange_delete_ok($args)
{
}
/**
* bind dest exchange to source exchange
*/
public function exchange_bind($destination, $source, $routing_key="",
$nowait=false, $arguments=null, $ticket=null)
{
$arguments =
没有合适的资源?快使用搜索试试~ 我知道了~
php 连接rabbitmq例子含PhpAmqpLib库
共55个文件
php:53个
bin:2个
3星 · 超过75%的资源 需积分: 48 69 下载量 155 浏览量
2013-08-14
01:26:04
上传
评论 1
收藏 5.04MB RAR 举报
温馨提示
php 连接rabbitmq例子含PhpAmqpLib库,免执行composer
资源推荐
资源详情
资源评论
收起资源包目录
mq_lib_sample_php.rar (55个子文件)
worker.php 781B
vendor
videlalvaro
php-amqplib
autoload.php 182B
composer
autoload_classmap.php 146B
ClassLoader.php 7KB
autoload_namespaces.php 191B
autoload_real.php 1KB
send.php 538B
PhpAmqpLib
Message
AMQPMessage.php 923B
Exception
AMQPOutOfBoundsException.php 139B
AMQPRuntimeException.php 131B
AMQPException.php 1KB
AMQPProtocolException.php 1013B
AMQPChannelException.php 198B
AMQPProtocolConnectionException.php 112B
AMQPConnectionException.php 204B
AMQPProtocolChannelException.php 109B
AMQPTimeoutException.php 131B
AMQPExceptionInterface.php 77B
Connection
AMQPSocketConnection.php 670B
AMQPStreamConnection.php 1KB
AMQPLazyConnection.php 2KB
AMQPSSLConnection.php 1KB
AMQPConnection.php 113B
AbstractConnection.php 15KB
Wire
AMQPReader.php 9KB
IO
StreamIO.php 3KB
AbstractIO.php 240B
SocketIO.php 2KB
Constants080.php 4KB
AMQPWriter.php 7KB
GenericContent.php 4KB
Constants091.php 3KB
AMQPDecimal.php 752B
Channel
AMQPChannel.php 22KB
AbstractChannel.php 9KB
Helper
Protocol
Protocol091.php 12KB
Wait091.php 2KB
MethodMap080.php 3KB
Protocol080.php 18KB
Wait080.php 3KB
MethodMap091.php 2KB
MiscHelper.php 3KB
Tests
config.php 195B
Unit
WireTest.php 4KB
Wire
AMQPWriterTest.php 1KB
Helper
Protocol
Protocol091Test.php 7KB
Functional
Bug49Test.php 1KB
fixtures
data_1mb.bin 1024KB
data_4mb.bin 4MB
StreamPublishConsumeTest.php 292B
SocketPublishConsumeTest.php 292B
FileTransferTest.php 2KB
Bug40Test.php 2KB
AbstractPublishConsumeTest.php 3KB
bootstrap.php 94B
共 55 条
- 1
资源评论
- pxchuan2017-06-19跑不起来,报错
- newsn2014-07-08貌似运行不起来,报错............
- liang_04292015-11-04资源差,根本不是Thinkphp内嵌的 rabbitmq, 对我本次下载没有帮助,浪费4分
gavinli
- 粉丝: 1
- 资源: 10
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功