<?php
namespace TrytoMqtt;
use TrytoMqtt\Protocols\Mqtt;
class Client
{
/**
* STATE_INITIAL.
*/
const STATE_INITIAL = 1;
/**
* STATE_CONNECTING
*/
const STATE_CONNECTING = 2;
/**
* STATE_WAITCONACK
*/
const STATE_WAITCONACK = 3;
/**
* STATE_ESTABLISHED
*/
const STATE_ESTABLISHED = 4;
/**
* STATE_DISCONNECT
*/
const STATE_DISCONNECT = 5;
/**
* DEFAULT_CLIENT_ID_PREFIX
*/
const DEFAULT_CLIENT_ID_PREFIX = 'swoole-mqtt-client';
/**
* MAX_TOPIC_LENGTH
*/
const MAX_TOPIC_LENGTH = 65535;
/**
* @var callable
*/
public $onConnect = null;
/**
* @var callable
*/
public $onReconnect = null;
/**
* @var callable
*/
public $onMessage = null;
/**
* @var callable
*/
public $onClose = null;
/**
* @var callable
*/
public $onError = null;
/**
* @var int
*/
protected $_state = 1;
/**
* @var int
*/
protected $_messageId = 1;
/**
* @var string
*/
protected $_remoteHost = '';
/**
* @var string
*/
protected $_remotePort = '';
/**
* @var AsyncTcpConnection
*/
protected $_connection = null;
/**
* @var boolean
*/
protected $_firstConnect = true;
/**
* ['topic'=>qos, ...]
* @var array
*/
protected $_resubscribeTopics = array();
/**
* @var int
*/
protected $_checkConnectionTimeoutTimer = 0;
/**
* @var int
*/
protected $_pingTimer = 0;
/**
* @var bool
*/
protected $_recvPingResponse = true;
/**
* @var bool
*/
protected $_doNotReconnect = false;
/**
* @var array
*/
protected $_outgoing = array();
/**
* 错误码
* @var array
*/
protected static $_errorCodeStringMap = array(
1 => 'Connection Refused, unacceptable protocol version',
2 => 'Connection Refused, identifier rejected',
3 => 'Connection Refused, Server unavailable',
4 => 'Connection Refused, bad user name or password',
5 => 'Connection Refused, not authorized',
100 => 'Connection closed',
101 => 'Connection timeout',
102 => 'Connection fail',
103 => 'Connection buffer full and close connection',
140 => 'No connection to broker',
240 => 'Invalid topic',
241 => 'Invalid qos',
);
/**
* 配置参数
* @var array
*/
protected $_options = array(
'clean_session' => 1, // set to 0 to receive QoS 1 and 2 messages while offline
'username' => '', // the username required by your broker
'password' => '', // the password required by your broker
'keepalive' => 50000, // default 50 seconds, set to 0 to disable
'protocol_name' => 'MQTT', // protocol name MQTT or MQIsdp
'protocol_level' => 4, // protocol level, MQTT is 4 and MQIsdp is 3
'reconnect_period' => 1000, // reconnect period default 1 second, set to 0 to disable
'connect_timeout' => 30000, // 30 seconds, time to wait before a CONNACK is received
'resubscribe' => true, // default true, if connection is broken and reconnects, subscribed topics are automatically subscribed again.
'bindto' => '', // bindto option, used to specify the IP address that PHP will use to access the network
'ssl' => false, // ssl context, see https://wiki.swoole.com/wiki/page/p-client_setting.html
'debug' => false, // debug
'socket_buffer_size' => 1024 * 1024 * 2, // 2M缓存区
'package_max_length' => 2000000, // 协议最大长度
);
/**
* 回调事件
* @var array
*/
protected $event = ['Connect', 'Receive', 'Close', 'BufferFull', 'BufferEmpty', 'Error'];
/**
* Client constructor.
* @param $address
* @param array $options
*/
public function __construct(string $host, int $port, $options = array())
{
$this->_remoteHost = $host;
$this->_remotePort = $port;
$this->setOptions($options);
$context = array();
if ($this->_options['ssl'] && is_array($this->_options['ssl'])) {
$context = array_merge($context, $this->_options['ssl']);
}
if ($this->_options['socket_buffer_size']) {
$context['socket_buffer_size'] = $this->_options['socket_buffer_size'];
}
if ($this->_options['package_max_length']) {
$context['package_max_length'] = $this->_options['package_max_length'];
}
$this->_connection = new \swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->_connection->set($context);
$this->onReconnect = array($this, 'onMqttReconnect');
$this->onMessage = function () {};
}
/**
* connect
*/
public function connect()
{
$this->_doNotReconnect = false;
// 设置回调
foreach ($this->event as $event) {
if (method_exists($this, 'onConnection' . $event)) {
$this->_connection->on($event, [$this, 'onConnection' . $event]);
}
}
$this->_state = static::STATE_CONNECTING;
$this->_connection->connect($this->_remoteHost, $this->_remotePort);
$this->setConnectionTimeout($this->_options['connect_timeout']);
if ($this->_options['debug']) {
echo "-- Try to connect to {$this->_remoteHost}:{$this->_remotePort}", PHP_EOL;
}
}
/**
* reconnect
*
* @param int $after
*/
public function reconnect($after = 0)
{
$this->_doNotReconnect = false;
// 设置回调
foreach ($this->event as $event) {
if (method_exists($this, 'onConnection' . $event)) {
$this->_connection->on($event, [$this, 'onConnection' . $event]);
}
}
$this->_connection->connect($this->_remoteHost, $this->_remotePort);
$this->setConnectionTimeout($this->_options['connect_timeout'] + $after);
if ($this->_options['debug']) {
echo "-- Reconnect after $after seconds", PHP_EOL;
}
}
/**
* publish
*
* @param $topic
* @param $content
* @param array $options
* @param callable $callback
*/
public function publish($topic, $content, $options = array(), $callback = null)
{
if ($this->checkDisconnecting($callback)) {
return;
}
static::isValidTopic($topic);
$qos = 0;
$retain = 0;
$dup = 0;
if (isset($options['qos'])) {
$qos = $options['qos'];
if ($this->checkInvalidQos($qos, $callback)) {
return;
}
}
if (!empty($options['retain'])) {
$retain = 1;
}
if (!empty($options['dup'])) {
$dup = 1;
}
$package = array(
'cmd' => Mqtt::CMD_PUBLISH,
'topic' => $topic,
'content' => $content,
'retain' => $retain,
'qos' => $qos,
'dup' => $dup,
);
if ($qos) {
$package['message_id'] = $this->incrMessageId();
if ($callback) {
$this->_outgoing[$package['message_id']] = $callback;
}
}
if ($this->_options['debug']) {
$message_id = isset($package['message_id']) ? $package['message_id'] : '';
echo "↑ Send PUBLISH package, topic:$topic content:$content retain:$retain qos:$qos dup:$dup message_id
没有合适的资源?快使用搜索试试~ 我知道了~
基于swoole的异步mqtt 客户端库,可用于接收或者发送mqtt协议的消息
共8个文件
php:4个
gitignore:1个
json:1个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 184 浏览量
2023-10-12
09:47:35
上传
评论
收藏 12KB ZIP 举报
温馨提示
一个基于swoole的异步mqtt 客户端库,可用于接收或者发送mqtt协议的消息。支持QoS 0、QoS 1、QoS 2。支持MQTT 3.1和3.1.1版本。 参考workerman-mqtt
资源推荐
资源详情
资源评论
收起资源包目录
swoole_mqtt-master.zip (8个子文件)
swoole_mqtt-master
_config.yml 26B
src
Protocols
Mqtt.php 15KB
Client.php 32KB
LICENSE 1KB
composer.json 381B
examples
subscribe.php 649B
publish.php 527B
.gitignore 31B
共 8 条
- 1
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6747
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 服务器概述服务器概述服务器概述服务器概述.txt
- 华中农业大学python实验题.txt
- 海康威视相机采图交叉编译示例程序,c++
- DETR-基于Tensorflow实现DETR目标检测算法-附流程教程+项目源码-优质项目实战.zip
- 3d激光slam地图发布程序,3d地图点云处理,c++程序
- 送给妈妈的一束鲜花.zip(母亲节祝福HTML源码)
- 稀疏化DETR-基于Pytorch实现稀疏化DETR-SparseDETR-附流程教程+项目源码-优质项目实战.zip
- 人工分类:SLTM的微博评论二分类数据集
- (自适应手机端)响应式房产合同知识产权网站pbootcms模板 企业管理类网站源码下载.zip
- (自适应手机端)响应式动力刀座pbootcms网站模板 五金机械设备类网站源码下载.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功