package cc.seedland.sdk.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.net.SocketException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description: MQ异步发送
* @Author zhoutianwei
* @Date 2018/1/11 15:28
* @Version V1.0
*/
public class RabbitMQAsyncProducer {
private RabbitMQConnectionFactory connectionFactory;
private ExecutorService executorService;
private RabbitDataQueue dataQueue;
private Channel channel;
private ConcurrentHashMap<Long, RabbitDataItem> pendingItems;
private MessageWork messageWorker;
public RabbitMQAsyncProducer(RabbitMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
dataQueue = new RabbitDataQueue();
pendingItems = new ConcurrentHashMap<Long, RabbitDataItem>();
}
protected void waitForConnection() throws InterruptedException, IOException {
connectionFactory.waitForConnection();
channel = connectionFactory.getChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
//是否批量
if (b) {
RabbitMQAsyncProducer.this.removeItemsUpto(l);
} else {
RabbitMQAsyncProducer.this.removeItem(l);
}
}
@Override
public void handleNack(long l, boolean b) throws IOException {
//是否批量
if (b) {
RabbitMQAsyncProducer.this.requeueItemsUpto(l);
} else {
RabbitMQAsyncProducer.this.requeueItem(l);
}
}
});
}
protected void requeueItemsUpto(long deliveryTag) {
synchronized (pendingItems) {
Iterator<Map.Entry<Long, RabbitDataItem>> it = pendingItems.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<Long, RabbitDataItem> entry = it.next();
if (entry.getKey() <= deliveryTag) {
dataQueue.add(entry.getValue());
it.remove();
}
}
}
}
protected void removeItemsUpto(long deliveryTag) {
synchronized (pendingItems) {
Iterator<Map.Entry<Long, RabbitDataItem>> it = pendingItems.entrySet()
.iterator();
while (it.hasNext()) {
Map.Entry<Long, RabbitDataItem> entry = it.next();
if (entry.getKey() <= deliveryTag) {
it.remove();
}
}
}
}
protected void requeueItem(long deliveryTag) {
synchronized (pendingItems) {
RabbitDataItem item = pendingItems.get(deliveryTag);
pendingItems.remove(deliveryTag);
dataQueue.add(item);
}
}
protected void removeItem(long deliveryTag) {
synchronized (pendingItems) {
pendingItems.remove(deliveryTag);
}
}
public void send(RabbitDataItem data) {
synchronized (dataQueue) {
dataQueue.add(data);
dataQueue.notify();
}
}
public void startAsyncPublisher() {
executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
try {
for (; ; ) {
waitForConnection();
publishFromLocalQueue();
connectionFactory.closeConnection();
}
} catch (Exception e) {
e.printStackTrace();
connectionFactory.closeConnection();
}
}
});
}
public void stopAsyncPublisher() {
executorService.shutdownNow();
}
protected void publishFromLocalQueue() throws InterruptedException {
RabbitDataItem item = null;
try {
for (; ; ) {
synchronized (dataQueue) {
if (dataQueue.isEmpty()) {
dataQueue.wait(1000);
if (dataQueue.isEmpty()) {
connectionFactory.closeConnection();
System.out.println("------> RabbitMQ:dataQueue waited...");
dataQueue.wait();
System.out.println("------> RabbitMQ:dataQueue notify...");
waitForConnection();
}
}
}
item = dataQueue.peek();
channel.exchangeDeclare(item.getExchange(), item.getExchangeType().toString(), true);
channel.queueDeclare(item.getQueue(), true,
false, false, null);
channel.queueBind(item.getQueue(), item.getExchange(),
item.getRoutingKey());
long deliveryTag = channel.getNextPublishSeqNo();
channel.basicPublish(item.getExchange(), item.getQueue(), item.getProperties(), item
.getData().getBytes());
//正在发生,等待确认
pendingItems.put(deliveryTag, item);
dataQueue.remove();
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
} catch (AlreadyClosedException e) {
//建立连接后中断,不处理队列,等待重连
e.printStackTrace();
} catch (SocketException e) {
//建立连接后中断,不处理队列,等待重连
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
//处理异常
if (messageWorker != null) {
messageWorker.handleException(item, e);
}
System.out.println("------> RabbitMQ message refused:" + item.toString());
dataQueue.remove();
}
}
public void setMessageWorker(MessageWork messageWorker) {
this.messageWorker = messageWorker;
}
}