<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007. |
| All Rights Reserved. |
+----------------------------------------------------------------------+
| |
| Licensed under the Apache License, Version 2.0 (the "License"); you |
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| implied. See the License for the specific language governing |
| permissions and limitations under the License. |
+----------------------------------------------------------------------+
| Author: Dave Renshaw |
+----------------------------------------------------------------------+
$Id: sam_mqtt.php,v 1.1 2007/02/02 15:36:46 dsr Exp $
*/
define("SAM_MQTT_CLEANSTART", "SAM_MQTT_CLEANSTART");
define("SAM_MQTT_QOS", "SAM_MQTT_QOS");
define("SAM_MQTT_SUB_SEPARATOR", "#-#");
/* ---------------------------------
SAMConnection
--------------------------------- */
class SAMConnection_MQTT {
var $debug = false;
var $errno = 0;
var $error = '';
/*
Info we need to keep between calls...
*/
var $sub_id = '';
var $port = '';
var $host = '';
var $cleanstart = false;
var $virtualConnected = false;
var $connected = false;
/*
Our current open socket...
*/
var $sock;
/*
Table of available operations using the MQTT protocol...
*/
var $operations = array("MQTT_CONNECT" => 1,
"MQTT_CONNACK" => 2,
"MQTT_PUBLISH" => 3,
"MQTT_PUBACK" => 4,
"MQTT_PUBREC" => 5,
"MQTT_PUBREL" => 6,
"MQTT_PUBCOMP" => 7,
"MQTT_SUBSCRIBE" => 8,
"MQTT_SUBACK" => 9,
"MQTT_UNSUBSCRIBE" => 10,
"MQTT_UNSUBACK" => 11,
"MQTT_PINGREC" => 12,
"MQTT_PINGRESP" => 13,
"MQTT_DISCONNECT" => 14);
/* ---------------------------------
Constructor
--------------------------------- */
function SAMConnection_MQTT() {
if ($this->debug) e('SAMConnection_MQTT()');
if ($this->debug) x('SAMConnection_MQTT()');
}
/* ---------------------------------
Commit
--------------------------------- */
function Commit() {
if ($this->debug) e('SAMConnection_MQTT.Commit()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.Commit() rc=$rc");
return $rc;
}
/* ---------------------------------
Connect
--------------------------------- */
function Connect($proto, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Connect()');
/* Check our optional parameter array for the necessary bits... */
if ($options[SAM_PORT] == '') {
$this->port = 1883;
} else {
$this->port = $options[SAM_PORT];
}
if ($options[SAM_HOST] == '') {
$this->host = 'localhost';
} else {
$this->host = $options[SAM_HOST];
}
$this->cleanstart = in_array(SAM_MQTT_CLEANSTART, $options);
if ($this->debug) t("SAMConnection_MQTT.Connect() host=$this->host, port=$this->port, cleanstart=$this->cleanstart");
if ($this->checkHost($this->host, $this->port)) {
$this->virtualConnected = true;
} else {
$this->virtualConnected = false;
}
if ($this->debug) x("SAMConnection_MQTT.Connect() rc=$this->virtualConnected");
return $this->virtualConnected;
}
/* ---------------------------------
Disconnect
--------------------------------- */
function Disconnect() {
if ($this->debug) e('SAMConnection_MQTT.Disconnect()');
$rc = false;
if ($this->virtualConnected) {
if ($this->connected) {
$msg = $this->fixed_header("MQTT_DISCONNECT").pack('C', 0);
fwrite($this->sock, $msg);
$response = fgets($this->sock, 128);
if ($this->debug) t('SAMConnection_MQTT.Disconnect() response is '.strlen($response).' bytes');
if (strlen($response) == 0) {
fclose($this->sock);
$this->sock = NULL;
}
}
$this->virtualConnected = false;
$this->connected = false;
$rc = true;
}
if ($this->debug) x("SAMConnection_MQTT.Disconnect() rc=$rc");
return $rc;
}
/* ---------------------------------
IsConnected
--------------------------------- */
function IsConnected() {
if ($this->debug) e('SAMConnection_MQTT.IsConnected()');
$rc = false;
if ($this->connected) {
$rc = true;
}
if ($this->debug) x("SAMConnection_MQTT.IsConnected() rc=$rc");
return $rc;
}
/* ---------------------------------
Peek
--------------------------------- */
function Peek() {
if ($this->debug) e('SAMConnection_MQTT.Peek()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.Peek() rc=$rc");
return $rc;
}
/* ---------------------------------
PeekAll
--------------------------------- */
function PeekAll() {
if ($this->debug) e('SAMConnection_MQTT.PeekAll()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.PeekAll() rc=$rc");
return $rc;
}
/* ---------------------------------
Receive
--------------------------------- */
function Receive($sub_id, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Receive()');
$rc = false;
/* strip the topic from the rear of the subscription id... */
$x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR);
if (!$x) {
$this->errno = 279;
$this->error = 'Specified subscription id ('.$sub_id.') is not valid!';
return false;
}
$topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR));
$si = substr($sub_id, 0, $x);
/* Are we already connected? */
if (!$this->connected) {
if ($this->debug) t('SAMConnection_MQTT.Receive() Not connected.');
/* No, so open up the connection... */
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
/* We are already connected. Are we using the right subscriber id? */
if ($this->sub_id != $si) {
if ($this->debug) t('SAMConnection_MQTT.Receive() Connected with wrong sub_id.');
/* No, We better reconnect then... */
$this->disconnect();
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() Connected OK.');
$rc = true;
}
}
if ($rc) {
/* have we got a timeout specified? */
if ($options[SAM_WAIT] > 1) {
$m = $options[SAM_WAIT] % 1000;
$s = ($options[SAM_WAIT] - $m) /1000;
if ($this->debug) t('SAMConnection_MQTT.Receive() timeout='.$options[SAM_WAIT]." ($s secs $m millisecs)");
stream_set_timeout($this->sock, $s, $m);
评论16
最新资源