#include <QTimer>
#include <QTextStream>
#include <QStringList>
#include <QSslSocket>
#include <QtEndian>
#include "qamqpglobal.h"
#include "qamqpexchange.h"
#include "qamqpexchange_p.h"
#include "qamqpqueue.h"
#include "qamqpqueue_p.h"
#include "qamqpauthenticator.h"
#include "qamqptable.h"
#include "qamqpclient_p.h"
#include "qamqpclient.h"
QAmqpClientPrivate::QAmqpClientPrivate(QAmqpClient *q)
: port(AMQP_PORT),
host(AMQP_HOST),
virtualHost(AMQP_VHOST),
autoReconnect(false),
reconnectFixedTimeout(false),
timeout(0),
connecting(false),
useSsl(false),
socket(0),
closed(false),
connected(false),
channelMax(0),
heartbeatDelay(0),
frameMax(AMQP_FRAME_MAX),
error(QAMQP::NoError),
q_ptr(q)
{
qRegisterMetaType<QAmqpMessage::PropertyHash>();
}
QAmqpClientPrivate::~QAmqpClientPrivate()
{
}
void QAmqpClientPrivate::init()
{
Q_Q(QAmqpClient);
initSocket();
heartbeatTimer = new QTimer(q);
QObject::connect(heartbeatTimer, SIGNAL(timeout()), q, SLOT(_q_heartbeat()));
reconnectTimer = new QTimer(q);
reconnectTimer->setSingleShot(true);
QObject::connect(reconnectTimer, SIGNAL(timeout()), q, SLOT(_q_connect()));
authenticator = QSharedPointer<QAmqpAuthenticator>(
new QAmqpPlainAuthenticator(QString::fromLatin1(AMQP_LOGIN), QString::fromLatin1(AMQP_PSWD)));
}
void QAmqpClientPrivate::initSocket()
{
Q_Q(QAmqpClient);
socket = new QSslSocket(q);
socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
socket->setSocketOption(QAbstractSocket::KeepAliveOption, 1);
QObject::connect(socket, SIGNAL(connected()), q, SLOT(_q_socketConnected()));
QObject::connect(socket, SIGNAL(disconnected()), q, SLOT(_q_socketDisconnected()));
QObject::connect(socket, SIGNAL(readyRead()), q, SLOT(_q_readyRead()));
QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
q, SLOT(_q_socketError(QAbstractSocket::SocketError)));
QObject::connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
q, SIGNAL(socketError(QAbstractSocket::SocketError)));
QObject::connect(socket, SIGNAL(stateChanged(QAbstractSocket::SocketState)),
q, SIGNAL(socketStateChanged(QAbstractSocket::SocketState)));
QObject::connect(socket, SIGNAL(sslErrors(QList<QSslError>)),
q, SIGNAL(sslErrors(QList<QSslError>)));
}
void QAmqpClientPrivate::resetChannelState()
{
foreach (QString exchangeName, exchanges.channels()) {
QAmqpExchange *exchange =
qobject_cast<QAmqpExchange*>(exchanges.get(exchangeName));
if (exchange) exchange->d_ptr->resetInternalState();
}
foreach (QString queueName, queues.channels()) {
QAmqpQueue *queue =
qobject_cast<QAmqpQueue*>(queues.get(queueName));
if (queue) queue->d_ptr->resetInternalState();
}
}
void QAmqpClientPrivate::setUsername(const QString &username)
{
QAmqpAuthenticator *auth = authenticator.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
QAmqpPlainAuthenticator *a = static_cast<QAmqpPlainAuthenticator*>(auth);
a->setLogin(username);
}
}
void QAmqpClientPrivate::setPassword(const QString &password)
{
QAmqpAuthenticator *auth = authenticator.data();
if (auth && auth->type() == QLatin1String("AMQPLAIN")) {
QAmqpPlainAuthenticator *a = static_cast<QAmqpPlainAuthenticator*>(auth);
a->setPassword(password);
}
}
void QAmqpClientPrivate::parseConnectionString(const QString &uri)
{
#if QT_VERSION > 0x040801
QUrl connectionString = QUrl::fromUserInput(uri);
#else
QUrl connectionString(uri, QUrl::TolerantMode);
#endif
if (connectionString.scheme() != AMQP_SCHEME &&
connectionString.scheme() != AMQP_SSL_SCHEME) {
qAmqpDebug() << Q_FUNC_INFO << "invalid scheme: " << connectionString.scheme();
return;
}
useSsl = (connectionString.scheme() == AMQP_SSL_SCHEME);
port = connectionString.port((useSsl ? AMQP_SSL_PORT : AMQP_PORT));
host = connectionString.host();
QString vhost = connectionString.path();
if (vhost.startsWith("/") && vhost.size() > 1)
vhost = vhost.mid(1);
#if QT_VERSION <= 0x050200
virtualHost = QUrl::fromPercentEncoding(vhost.toUtf8());
setPassword(QUrl::fromPercentEncoding(connectionString.password().toUtf8()));
setUsername(QUrl::fromPercentEncoding(connectionString.userName().toUtf8()));
#else
virtualHost = vhost;
setPassword(connectionString.password());
setUsername(connectionString.userName());
#endif
}
void QAmqpClientPrivate::_q_connect()
{
if (reconnectTimer)
reconnectTimer->stop();
if (socket->state() != QAbstractSocket::UnconnectedState) {
qAmqpDebug() << Q_FUNC_INFO << "socket already connected, disconnecting..";
_q_disconnect();
// We need to explicitly close connection here because either way it will not be closed until we receive closeOk
closeConnection();
}
qAmqpDebug() << "connecting to host: " << host << ", port: " << port;
if (useSsl)
socket->connectToHostEncrypted(host, port);
else
socket->connectToHost(host, port);
}
void QAmqpClientPrivate::_q_disconnect()
{
if (reconnectTimer)
reconnectTimer->stop();
if (socket->state() == QAbstractSocket::UnconnectedState) {
qAmqpDebug() << Q_FUNC_INFO << "already disconnected";
return;
}
buffer.clear();
close(200, "client disconnect");
}
// private slots
void QAmqpClientPrivate::_q_socketConnected()
{
if (reconnectTimer)
reconnectTimer->stop();
if(reconnectFixedTimeout == false)
timeout = 0;
char header[8] = {'A', 'M', 'Q', 'P', 0, 0, 9, 1};
socket->write(header, 8);
}
void QAmqpClientPrivate::_q_socketDisconnected()
{
Q_Q(QAmqpClient);
buffer.clear();
resetChannelState();
if (connected)
connected = false;
Q_EMIT q->disconnected();
}
void QAmqpClientPrivate::_q_heartbeat()
{
QAmqpHeartbeatFrame frame;
sendFrame(frame);
}
void QAmqpClientPrivate::_q_socketError(QAbstractSocket::SocketError error)
{
if(reconnectFixedTimeout == false)
{
if (timeout <= 0) {
timeout = 1000;
} else {
if (timeout < 120000)
timeout *= 5;
}
}
switch (error) {
case QAbstractSocket::ConnectionRefusedError:
case QAbstractSocket::RemoteHostClosedError:
case QAbstractSocket::SocketTimeoutError:
case QAbstractSocket::NetworkError:
case QAbstractSocket::ProxyConnectionClosedError:
case QAbstractSocket::ProxyConnectionRefusedError:
case QAbstractSocket::ProxyConnectionTimeoutError:
default:
qAmqpDebug() << "socket error: " << socket->errorString();
break;
}
// per spec, on any error we need to close the socket immediately
// and send no more data. only try to send the close message if we
// are actively connected
if (socket->state() == QAbstractSocket::ConnectedState ||
socket->state() == QAbstractSocket::ConnectingState) {
socket->abort();
}
errorString = socket->errorString();
if (autoReconnect && reconnectTimer) {
qAmqpDebug() << "trying to reconnect after: " << timeout << "ms";
reconnectTimer->start(timeout);
}
}
void QAmqpClientPrivate::_q_readyRead()
{
Q_Q(QAmqpClient);
while (socket->bytesAvailable() >= QAmqpFrame::HEADER_SIZE) {
unsigned char headerData[QAmqpFrame::HEADER_SIZE];
socket->peek((char*)headerData, QAmqpFrame::HEADER_SIZE);
const quint32 payloadSiz
没有合适的资源?快使用搜索试试~ 我知道了~
C++/QT 使用RabbitMQ
共84个文件
dll:26个
qm:22个
h:18个
需积分: 42 12 下载量 94 浏览量
2022-11-18
20:05:00
上传
评论 1
收藏 20.14MB RAR 举报
温馨提示
文章配套源码及可执行程序demo
资源推荐
资源详情
资源评论
收起资源包目录
QtRabbitMQ-master.rar (84个子文件)
QtRabbitMQ-master
ReleaseFiles
iconengines
qsvgicon.dll 97KB
RabbitClient.exe 3.41MB
libEGL.dll 66KB
libgcc_s_seh-1.dll 73KB
Qt5Svg.dll 562KB
bearer
qgenericbearer.dll 138KB
qamqp.dll 234KB
imageformats
qwbmp.dll 68KB
qjpeg.dll 474KB
qtiff.dll 515KB
qtga.dll 70KB
qico.dll 82KB
qgif.dll 77KB
qicns.dll 96KB
qsvg.dll 73KB
qwebp.dll 666KB
Qt5Network.dll 2.47MB
platforms
qwindows.dll 2.72MB
libwinpthread-1.dll 51KB
libstdc++-6.dll 1.36MB
D3Dcompiler_47.dll 3.98MB
Qt5Core.dll 7.96MB
opengl32sw.dll 19.95MB
translations
qt_fi.qm 176KB
qt_fr.qm 162KB
qt_gd.qm 185KB
qt_ru.qm 194KB
qt_ko.qm 128KB
qt_ca.qm 180KB
qt_pl.qm 159KB
qt_de.qm 209KB
qt_bg.qm 161KB
qt_uk.qm 155KB
qt_en.qm 23B
qt_lv.qm 150KB
qt_hu.qm 157KB
qt_sk.qm 123KB
qt_cs.qm 171KB
qt_ja.qm 127KB
qt_da.qm 170KB
qt_zh_TW.qm 125KB
qt_ar.qm 156KB
qt_he.qm 135KB
qt_es.qm 161KB
qt_it.qm 157KB
styles
qwindowsvistastyle.dll 303KB
libGLESv2.dll 6.07MB
Qt5Widgets.dll 8.22MB
Qt5Gui.dll 9.39MB
src
wrabbitmq.h 1KB
RabbitClient.pro 635B
wrabbitmq.cpp 4KB
qrabbitmq.h 951B
qrabbitmq.cpp 2KB
QAMQP
qamqpauthenticator.h 2KB
qamqpclient.h 5KB
qamqpframe.cpp 13KB
qamqpexchange_p.h 1KB
QAMQP.pro.user 22KB
qamqpexchange.cpp 11KB
qamqpauthenticator.cpp 944B
qamqpchannelhash.cpp 3KB
qamqpqueue.cpp 19KB
qamqpexchange.h 4KB
qamqpframe_p.h 4KB
qamqptable.cpp 11KB
qamqpclient_p.h 3KB
qamqpqueue.h 4KB
qamqpglobal.h 3KB
qamqpqueue_p.h 2KB
qamqpmessage.cpp 3KB
qamqpmessage_p.h 524B
qamqpchannel.cpp 9KB
qamqpchannelhash_p.h 3KB
QAMQP.pro 1KB
qamqptable.h 1KB
qamqpclient.cpp 27KB
qamqpmessage.h 3KB
qamqpchannel_p.h 2KB
qamqpchannel.h 2KB
main.cpp 172B
mainwindow.ui 5KB
mainwindow.cpp 1KB
mainwindow.h 546B
共 84 条
- 1
资源评论
我有只猫叫蛋卷儿
- 粉丝: 107
- 资源: 6
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功