package rabbitmqpool;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Vector;
import com.rabbitmq.client.Connection;
/**
* @ClassName: ConnectionPool
* @Description: RabbitMq连接池
* @date 2019年3月8日 上午11:01:16
* @version V1.0
**/
public class ConnectionPool {
private int initialConnections = 10; // 连接池的初始大小
private int incrementalConnections = 5;// 连接池自动增加的大小
private int maxConnections = 50; // 连接池最大的大小
private Vector<PooledConnection> connections = null; // 存放连接池中MQ连接的向量 , 初始时为 null
public ConnectionPool() {
}
/**
* 返回连接池的初始大小
*
* @return 初始连接池中可获得的连接数量
*/
public int getInitialConnections() {
return this.initialConnections;
}
/**
* 设置连接池的初始大小
*
* @param 用于设置初始连接池中连接的数量
*/
public void setInitialConnections(int initialConnections) {
this.initialConnections = initialConnections;
}
/**
* 返回连接池自动增加的大小
*
* @return 连接池自动增加的大小
*/
public int getIncrementalConnections() {
return this.incrementalConnections;
}
/**
* 设置连接池自动增加的大小
*
* @param 连接池自动增加的大小
*/
public void setIncrementalConnections(int incrementalConnections) {
this.incrementalConnections = incrementalConnections;
}
/**
* 返回连接池中最大的可用连接数量
*
* @return 连接池中最大的可用连接数量
*/
public int getMaxConnections() {
return this.maxConnections;
}
/**
* 设置连接池中最大可用的连接数量
*
* @param 设置连接池中最大可用的连接数量值
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
/**
*
* 创建一个MQ连接池,连接池中的可用连接的数量采用类成员 initialConnections 中设置的值
*/
public synchronized void createPool() throws Exception {
// 确保连接池没有创建
// 如果连接池己经创建了,保存连接的向量 connections 不会为空
if (connections != null) {
return; // 如果己经创建,则返回
}
// 创建保存连接的向量 , 初始时有 0 个元素
connections = new Vector<PooledConnection>();
// 根据 initialConnections 中设置的值,创建连接。
createConnections(this.initialConnections);
System.out.println("RABBITMQ连接池创建成功! ");
}
/**
* 创建由 numConnections 指定数目的MQ连接 , 并把这些连接 放入 connections 向量中
* @param numConnections
* 要创建的MQ连接的数目
*/
private void createConnections(int numConnections) {
// 循环创建指定数目的MQ连接
for (int x = 0; x < numConnections; x++) {
// 是否连接池中的MQ连接的数量己经达到最大?最大值由类成员 maxConnections
// 指出,如果 maxConnections 为 0 或负数,表示连接数量没有限制。
// 如果连接数己经达到最大,即退出。
if (this.maxConnections > 0 && this.connections.size() >= this.maxConnections) {
break;
}
// add a new PooledConnection object to connections vector
// 增加一个连接到连接池中(向量 connections 中)
try {
connections.addElement(new PooledConnection(newConnection()));
} catch (Exception e) {
System.out.println(" 创建RABBITMQ连接失败! " + e.getMessage());
System.out.println(e.toString());
}
// System.out.println("RABBITMQ连接己创建 ......");
}
}
/**
* 创建一个新的MQ连接并返回它
*
* @return 返回一个新创建的MQ连接
*/
private Connection newConnection() {
return MQConnUtil.getConnection(); // 返回创建的新的MQ连接
}
/**
* 通过调用 getFreeConnection() 函数返回一个可用的MQ连接 , 如果当前没有可用的MQ连接,并且更多的MQ连接不能创
* 建(如连接池大小的限制),此函数等待一会再尝试获取。
*
* @return 返回一个可用的MQ连接对象
*/
public synchronized Connection getConnection() {
// 确保连接池己被创建
if (connections == null) {
//连接池还没创建,则返回 null
return null;
}
//获得一个可用的MQ连接
Connection conn = getFreeConnection();
// 如果目前没有可以使用的连接,即所有的连接都在使用中
while (conn == null) {
// 等一会再试
wait(250);
conn = getFreeConnection(); // 重新再试,直到获得可用的连接,如果
// getFreeConnection() 返回的为 null
// 则表明创建一批连接后也不可获得可用连接
}
return conn;// 返回获得的可用的连接
}
/**
* 本函数从连接池向量 connections 中返回一个可用的的MQ连接,如果 当前没有可用的MQ连接,本函数则根据
* incrementalConnections 设置 的值创建几个MQ连接,并放入连接池中。 如果创建后,所有的连接仍都在使用中,则返回 null
*
* @return 返回一个可用的MQ连接
*/
private Connection getFreeConnection() {
// 从连接池中获得一个可用的MQ连接
Connection conn = findFreeConnection();
if (conn == null) {
// 如果目前连接池中没有可用的连接
// 创建一些连接
createConnections(incrementalConnections);
// 重新从池中查找是否有可用连接
conn = findFreeConnection();
if (conn == null) {
// 如果创建连接后仍获得不到可用的连接,则返回 null
return null;
}
}
return conn;
}
/**
* 查找连接池中所有的连接,查找一个可用的MQ连接, 如果没有可用的连接,返回 null
*
* @return 返回一个可用的MQ连接
*/
private Connection findFreeConnection() {
Connection conn = null;
PooledConnection pConn = null;
// 获得连接池向量中所有的对象
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍历所有的对象,看是否有可用的连接
while (enumerate.hasMoreElements()) {
pConn = (PooledConnection) enumerate.nextElement();
if (!pConn.isBusy()) {
// 如果此对象不忙,则获得它的MQ连接并把它设为忙
conn = pConn.getConnection();
pConn.setBusy(true);
// 测试此连接是否可用
break; // 己经找到一个可用的连接,退出
}
}
return conn;// 返回找到到的可用连接
}
/**
* 此函数返回一个MQ连接到连接池中,并把此连接置为空闲。 所有使用连接池获得的MQ连接均应在不使用此连接时返回它。
*
* @param 需返回到连接池中的连接对象
*/
public void returnConnection(Connection conn) {
// 确保连接池存在,如果连接没有创建(不存在),直接返回
if (connections == null) {
System.out.println(" 连接池不存在,无法返回此连接到连接池中 !");
return;
}
PooledConnection pConn = null;
Enumeration<PooledConnection> enumerate = connections.elements();
// 遍历连接池中的所有连接,找到这个要返回的连接对象
while (enumerate.hasMoreElements()) {
pConn = (PooledConnection) enumerate.nextElement();
// 先找到连接池中的要返回的连接对象
if (conn == pConn.getConnection()) {
// 找到了 , 设置此连接为空闲状态
pConn.setBusy(false);
break;
}
}
}
/**
* 刷新连接池中所有的连接对象
*
*/
public synchronized void refreshConnections() {
// 确保连接池己创新存在
if (connections == null) {
System.out.println(" 连接池不存在,无法刷新 !");
return;
}
PooledConnection pConn = null;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {
// 获得一个连接对象
pConn = (PooledConnection) enumerate.nextElement();
// 如果对象忙则等 5 秒 ,5 秒后直接刷新
if (pConn.isBusy()) {
wait(5000); // 等 5 秒
}
// 关闭此连接,用一个新的连接代替它。
closeConnection(pConn.getConnection());
pConn.setConnection(newConnection());
pConn.setBusy(false);
}
}
/**
* 关闭连接池中所有的连接,并清空连接池。
*/
public synchronized void closeConnectionPool() {
// 确保连接池存在,如果不存在,返回
if (connections == null) {
System.out.println(" 连接池不存在,无法关闭 !");
return;
}
PooledConnection pConn = null;
Enumeration<PooledConnection> enumerate = connections.elements();
while (enumerate.hasMoreElements()) {
pConn = (PooledConnection) enumerate.nextElement();
// 如果忙,等 5 秒
if (pConn.isBusy()) {
wait(5000); // 等 5 秒
}
// 5 秒后直接关闭它
closeConnection(pConn.getConnection());
// 从连接池向量中删除它
connections.removeElement(pConn);
}
// 置连接池为空
connections
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
rabbitmqpooldemo.rar (17个子文件)
rabbitmqpooldemo
bin
rabbitmqpool
ConnectionTest.class 2KB
ConnectionPool$PooledConnection.class 1KB
Consumer.class 2KB
ConnectionPool.class 5KB
ConnectionPoolUtils.class 825B
MQConnUtil.class 914B
Producer.class 2KB
.settings
org.eclipse.jdt.core.prefs 598B
src
rabbitmqpool
Producer.java 1KB
ConnectionPool.java 9KB
MQConnUtil.java 804B
ConnectionPoolUtils.java 686B
ConnectionTest.java 2KB
Consumer.java 1KB
.project 392B
.classpath 365B
lib
amqp-client-3.5.0.jar 405KB
共 17 条
- 1
资源评论
御用备胎
- 粉丝: 108
- 资源: 5
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功