<?php
/**********************************************************\
| |
| hprose |
| |
| Official WebSite: http://www.hprose.com/ |
| http://www.hprose.org/ |
| |
\**********************************************************/
/**********************************************************\
* *
* Hprose/Swoole/WebSocket/Client.php *
* *
* hprose swoole websocket client library for php 5.3+ *
* *
* LastModified: Nov 18, 2016 *
* Author: Ma Bingyao <andot@hprose.com> *
* *
\**********************************************************/
namespace Hprose\Swoole\WebSocket;
use stdClass;
use Exception;
use Hprose\Future;
use Hprose\TimeoutException;
use swoole_http_client;
class Client extends \Hprose\Client {
public $type;
public $host = '';
public $ip = '';
public $port = 80;
public $ssl = false;
public $keepAlive = true;
public $keepAliveTimeout = 300;
private $header = array();
private $id = 0;
private $count = 0;
private $futures = array();
private $requests = array();
private $ready = null;
private $connecting = false;
private $ws = null;
public function __construct($uris = null) {
parent::__construct($uris);
}
public function setHeader($name, $value) {
$lname = strtolower($name);
if ($lname != 'content-type' &&
$lname != 'content-length' &&
$lname != 'host') {
if ($value) {
$this->header[$name] = $value;
}
else {
unset($this->header[$name]);
}
}
}
public function setKeepAlive($keepAlive = true) {
$this->keepAlive = $keepAlive;
$this->header['Connection'] = $keepAlive ? 'keep-alive' : 'close';
if ($keepAlive) {
$this->header['Keep-Ailve'] = $this->keepAliveTimeout;
}
else {
unset($this->header['Keep-Ailve']);
}
}
public function isKeepAlive() {
return $this->keepAlive;
}
public function setKeepAliveTimeout($timeout) {
$this->keepAliveTimeout = $timeout;
if ($this->keepAlive) {
$this->header['Keep-Ailve'] = $timeout;
}
}
public function getKeepAliveTimeout() {
return $this->keepAliveTimeout;
}
public function getHost() {
return $this->host;
}
public function getPort() {
return $this->port;
}
public function isSSL() {
return $this->ssl;
}
protected function setUri($uri) {
parent::setUri($uri);
$p = parse_url($uri);
if ($p) {
switch (strtolower($p['scheme'])) {
case 'ws':
$this->host = $p['host'];
$this->port = isset($p['port']) ? $p['port'] : 80;
$this->path = isset($p['path']) ? $p['path'] : '/';
$this->ssl = false;
break;
case 'wss':
$this->host = $p['host'];
$this->port = isset($p['port']) ? $p['port'] : 443;
$this->path = isset($p['path']) ? $p['path'] : '/';
$this->ssl = true;
break;
default:
throw new Exception("Only support ws and wss scheme");
}
}
else {
throw new Exception("Can't parse this uri: " . $uri);
}
$this->header['Host'] = $this->host;
$this->header['Connection'] = $this->keepAlive ? 'keep-alive' : 'close';
if ($this->keepAlive) {
$this->header['Keep-Ailve'] = $this->keepAliveTimeout;
}
if (filter_var($this->host, FILTER_VALIDATE_IP) === false) {
$ip = gethostbyname($this->host);
if ($ip === $this->host) {
throw new Exception('DNS lookup failed');
}
else {
$this->ip = $ip;
}
}
else {
$this->ip = $this->host;
}
}
public function close() {
if ($this->ws !== null && $this->ws->isConnected()) {
$this->ws->close();
$this->ws = null;
}
}
protected function wait($interval, $callback) {
$future = new Future();
swoole_timer_after($interval * 1000, function() use ($future, $callback) {
Future\sync($callback)->fill($future);
});
return $future;
}
private function getNextId() {
return ($this->id < 0x7FFFFFFF) ? ++$this->id : $this->id = 0;
}
private function connect() {
$this->connecting = true;
$connecting = &$this->connecting;
$this->ready = new Future();
$ready = &$this->ready;
$futures = &$this->futures;
$count = &$this->count;
$requests = &$this->requests;
$ws = new swoole_http_client($this->ip, $this->port, $this->ssl);
$ws->on('error', function($ws) use (&$futures, &$count) {
$error = new Exception(socket_strerror($ws->errCode));
foreach ($futures as $future) {
$future->reject($error);
}
$futures = array();
$count = 0;
});
$buffer = '';
$self = $this;
$ws->on('message', function($ws, $frame) use ($self, &$buffer, &$futures, &$count, &$requests, &$ready) {
if ($frame->finish) {
$data = $buffer . $frame->data;
$buffer = '';
list(, $id) = unpack('N', substr($data, 0, 4));
if (isset($futures[$id])) {
$future = $futures[$id];
unset($futures[$id]);
--$count;
$future->resolve(substr($data, 4));
}
if (($count < 100) && count($requests) > 0) {
++$count;
$request = array_pop($requests);
$ready->then(function() use ($ws, $request, &$futures) {
$id = $request[0];
$data = pack('N', $id) . $request[1];
if ($ws->push($data, WEBSOCKET_OPCODE_BINARY, true) === false) {
if (isset($futures[$id])) {
$error = new Exception(socket_strerror($ws->errCode));
$futures[$id]->reject($error);
}
}
});
}
if ($count === 0) {
if (!$self->keepAlive) $ws->close();
}
}
else {
$buffer .= $frame->data;
}
});
$ws->set(array('keep_alive' => $this->keepAlive,
'timeout' => $this->timeout / 1000));
$ws->setHeaders($this->header);
$this->ws = $ws;
$this->ws->upgrade($this->path, function() use (&$connecting, &$ready) {
$connecting = false;
$ready->resolve(null);
});
}
protected function sendAndReceive($request, stdClass $context) {
$future = new Future();
$id = $this->getNextId();
$count = &$this->count;
$futures = &$this->futures;
$futures[$id] = $future;
if ($context->timeout > 0) {
$timeoutFuture = new Future();
$timer = swoole_timer
没有合适的资源?快使用搜索试试~ 我知道了~
Hprose 全名是高性能远程对象服务引擎
共45个文件
php:38个
json:2个
sh:1个
需积分: 3 0 下载量 187 浏览量
2022-11-11
14:08:09
上传
评论
收藏 35KB ZIP 举报
温馨提示
Hprose 全名是高性能远程对象服务引擎。 它是一个新型的轻量级跨语言跨平台面向对象的高性能远程动态通讯中间件。它不但易学易用,而且功能强大。本项目是 Hprose 的 Swoole 版本实现
资源推荐
资源详情
资源评论
收起资源包目录
hprose-swoole-master.zip (45个子文件)
hprose-swoole-master
composer.json 1KB
.travis.yml 204B
install_ext.sh 432B
tests
Bootstrap.php 61B
phpunit.xml.dist 413B
src
Hprose
Swoole
WebSocket
Service.php 5KB
Client.php 9KB
Server.php 3KB
Http
Service.php 6KB
Client.php 5KB
Server.php 3KB
Transporter.php 6KB
Timer.php 1KB
Socket
FullDuplexTransporter.php 6KB
Service.php 6KB
Client.php 6KB
Server.php 4KB
HalfDuplexTransporter.php 5KB
Transporter.php 4KB
Client.php 4KB
Server.php 3KB
examples
composer.json 346B
src
TimeoutServer.php 437B
UnixServer.php 277B
TcpHDClient.php 1KB
WebSocketClient.php 1KB
HttpClientBenchmark.php 550B
WebSocketClientBenchmark.php 548B
TcpFDClientBenchmark.php 579B
TcpHDClientBenchmark.php 580B
TcpServer.php 278B
UnixClientBenchmark.php 546B
WebSocketServer.php 277B
HttpClient.php 1KB
PushServer.php 780B
PushClient.php 340B
UnixClient.php 1KB
promisify.php 319B
HttpServer.php 313B
TimePushClient.php 343B
TimeoutClient.php 588B
TimePushServer.php 329B
TcpFDClient.php 1KB
LICENSE.md 1KB
.gitignore 59B
共 45 条
- 1
资源评论
m0_72731342
- 粉丝: 2
- 资源: 1832
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功