package com.wxer.controller;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class MqProducer {
static private String sendTopic = "MyTestTopic"; //消息推送主题
static private String sendMsg = "rocketmq message"; //推送消息负载
static private int sendCount = 10; //消息推送条数
static private String strNameSvrAddr = "192.168.0.35:9876"; //namesrv地址
static private DefaultMQProducer producer = null; //生产者
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建初始化生产者
InitMQProducer();
//启动生产者
if (!StartMQProducer()) {
return;
}
//同步发送消息
long startTime1 = System.currentTimeMillis();
SyncSend();
long endTime1 = System.currentTimeMillis();
System.out.println("同步阻塞发送 " + sendCount + "消息, 耗时"+ (endTime1 - startTime1) + "ms");
//异步非阻塞发送消息
// long startTime2 = System.currentTimeMillis();
// AsyncSend();
// long endTime2 = System.currentTimeMillis();
// System.out.println("异步非阻塞发送消息 " + sendCount + "消息, 耗时"+ (endTime2 - startTime2) + "ms");
//Thread.sleep(50000);
//单向发送消息
// long startTime3 = System.currentTimeMillis();
// SendOneway();
// long endTime3 = System.currentTimeMillis();
// System.out.println("单向发送消息 " + sendCount + "消息, 耗时"+ (endTime3 - startTime3) + "ms");
//关闭消费者
producer.shutdown();
}
//创建初始化生产者
static private void InitMQProducer() {
//创建生产者组
if (producer == null){
producer = new DefaultMQProducer("ProducerGroup01");
}
//设置nameserver地址
producer.setNamesrvAddr(strNameSvrAddr);
producer.setAsyncSenderExecutor(Executors.newFixedThreadPool(100));
}
//启动生产者
static private boolean StartMQProducer(){
if (producer == null){
return false;
}
try {
producer.start();
return true;
}catch (MQClientException e){
e.printStackTrace();
}
return false;
}
//方式一:同步阻塞式发消息,发送完消息,必须等broker返回一个发送成功的消息,这种方式消息发送的安全性可以得到保证,但是效率慢
static public void SyncSend(/*String topic, String tags, String data*/) throws InterruptedException {
// if (topic == null || data == null){
// return;
// }
for (int i = 0; i < sendCount; i++) {
try {
Message msg = new Message(sendTopic, (sendMsg+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.print(i+".......");
} catch (Exception e) {
e.printStackTrace();
System.out.print("send error.............................");
}
}
}
//顺序发送,效率慢
static public void SendInOrder() throws InterruptedException {
for (int i = 0; i < sendCount; i++) {
try {
Message msg = new Message(sendTopic, (sendMsg+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer)o;
int index = id % list.size();
return list.get(index);
}
}, 0);
} catch (Exception e) {
e.printStackTrace();
System.out.print("send error.............................");
}
}
}
//方式二:异步非阻塞式发消息,发送完消息,不等broker返回一个发送成功的消息
//获得broker的确认信息,要采用监听的形式,这种方式消息发送的安全性无法得到保证,但是速度快
static public void AsyncSend() throws InterruptedException {
//final Semaphore semaphore = new Semaphore(sendCount);
producer.setRetryTimesWhenSendAsyncFailed(5);
final long startTime = System.currentTimeMillis();
for (int i = 0; i < sendCount; i++) {
try {
Message msg = new Message(sendTopic, (sendMsg+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
final int finalI = i;
producer.send(msg, new SendCallback() {
//发送成功
public void onSuccess(SendResult sendResult) {
if (finalI == sendCount-1){
long endTime = System.currentTimeMillis();
System.out.println("异步非阻塞发送消息 " + finalI + "消息, 耗时"+ (endTime - startTime) + "ms");
}
}
//发送失败
public void onException(Throwable throwable) {
System.out.println("第 " + finalI + " 条消息发送失败");
}
});
} catch (Exception e) {
e.printStackTrace();
System.out.println("send error.............................");
}
}
//semaphore.acquire(sendCount);
}
//方式三:单向发送消息,不关心有没有发送成功。速度最快
static public void SendOneway() throws InterruptedException {
for (int i = 0; i < sendCount; i++) {
try {
Message msg = new Message(sendTopic, (sendMsg+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
Thread.sleep(1);
} catch (Exception e) {
e.printStackTrace();
System.out.print("send error.............................");
}
}
}
}
评论0
最新资源